feat: Add tasks orchestrator (#11161)

Fixes: FRMW-2885
This commit is contained in:
Harminder Virk
2025-01-27 18:13:49 +05:30
committed by GitHub
parent 864d772e34
commit 10962a5c54
2 changed files with 393 additions and 0 deletions

View File

@@ -0,0 +1,244 @@
import { asValue } from "awilix"
import { container } from "@medusajs/framework"
import type { IndexTypes } from "@medusajs/types"
import { Orchestrator } from "../../src/orchestrator"
function creatingFakeLockingModule() {
return {
lockEntities: new Set(),
acquire(key: string) {
if (this.lockEntities.has(key)) {
throw new Error("Lock already exists")
}
this.lockEntities.add(key)
},
release(key: string) {
this.lockEntities.delete(key)
},
}
}
describe("Orchestrator", () => {
test("process each entity via the task runner", async () => {
const processedEntities: string[] = []
const lockingModule = creatingFakeLockingModule()
const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [
{
entity: "brand",
alias: "brand",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
{
entity: "product",
alias: "product",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
]
container.register({
locking: asValue(lockingModule),
})
const orchestrator = new Orchestrator(container, entities, {
lockDuration: 60 * 1000,
async taskRunner(entity) {
expect(orchestrator.state).toEqual("processing")
processedEntities.push(entity.entity)
},
})
await orchestrator.process()
expect(lockingModule.lockEntities.size).toEqual(0)
expect(orchestrator.state).toEqual("completed")
expect(processedEntities).toEqual(["brand", "product"])
})
test("do not process tasks when unable to acquire lock", async () => {
const processedEntities: string[] = []
const lockingModule = creatingFakeLockingModule()
const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [
{
entity: "brand",
alias: "brand",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
{
entity: "product",
alias: "product",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
]
container.register({
locking: asValue({
...lockingModule,
acquire() {
throw new Error("Unable to acquire lock")
},
}),
})
const orchestrator = new Orchestrator(container, entities, {
lockDuration: 60 * 1000,
async taskRunner(entity) {
processedEntities.push(entity.entity)
},
})
await orchestrator.process()
expect(processedEntities).toEqual([])
})
test("share tasks between multiple instances", async () => {
const processedEntities: { owner: string; entity: string }[] = []
const lockingModule = creatingFakeLockingModule()
const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [
{
entity: "brand",
alias: "brand",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
{
entity: "product",
alias: "product",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
]
container.register({
locking: asValue(lockingModule),
})
const orchestrator = new Orchestrator(container, entities, {
lockDuration: 60 * 1000,
async taskRunner(entity) {
processedEntities.push({ entity: entity.entity, owner: "instance-1" })
},
})
const orchestrator1 = new Orchestrator(container, entities, {
lockDuration: 60 * 1000,
async taskRunner(entity) {
processedEntities.push({ entity: entity.entity, owner: "instance-2" })
},
})
await Promise.all([orchestrator.process(), orchestrator1.process()])
expect(processedEntities).toEqual([
{
entity: "brand",
owner: "instance-1",
},
{
entity: "product",
owner: "instance-2",
},
])
expect(lockingModule.lockEntities.size).toEqual(0)
})
test("stop processing when task runner throws error", async () => {
const processedEntities: string[] = []
const lockingModule = creatingFakeLockingModule()
const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [
{
entity: "brand",
alias: "brand",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
{
entity: "product",
alias: "product",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
]
container.register({
locking: asValue(lockingModule),
})
const orchestrator = new Orchestrator(container, entities, {
lockDuration: 60 * 1000,
async taskRunner(entity) {
if (entity.entity === "product") {
throw new Error("Cannot process")
}
processedEntities.push(entity.entity)
},
})
await expect(orchestrator.process()).rejects.toThrow("Cannot process")
expect(orchestrator.state).toEqual("error")
expect(processedEntities).toEqual(["brand"])
expect(lockingModule.lockEntities.size).toEqual(0)
})
test("throw error when the same instance is executed to process tasks parallely", async () => {
const processedEntities: string[] = []
const lockingModule = creatingFakeLockingModule()
const entities: IndexTypes.SchemaObjectEntityRepresentation[] = [
{
entity: "brand",
alias: "brand",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
{
entity: "product",
alias: "product",
fields: ["*"],
listeners: [],
moduleConfig: {},
parents: [],
},
]
container.register({
locking: asValue(lockingModule),
})
const orchestrator = new Orchestrator(container, entities, {
lockDuration: 60 * 1000,
async taskRunner(entity) {
expect(orchestrator.state).toEqual("processing")
processedEntities.push(entity.entity)
},
})
await expect(
Promise.all([orchestrator.process(), orchestrator.process()])
).rejects.toThrow("Cannot re-run an already running orchestrator instance")
expect(lockingModule.lockEntities.size).toEqual(0)
})
})

View File

@@ -0,0 +1,149 @@
import { ILockingModule, IndexTypes, MedusaContainer } from "@medusajs/types"
export class Orchestrator {
/**
* Reference to the locking module
*/
#lockingModule: ILockingModule
/**
* Owner id when acquiring locks
*/
#lockingOwner = `index-sync-${process.pid}`
/**
* The current state of the orchestrator
*
* - In "idle" state, one can call the "run" method.
* - In "processing" state, the orchestrator is looping over the entities
* and processing them.
* - In "completed" state, the provided entities have been processed.
* - The "error" state is set when the task runner throws an error.
*/
#state: "idle" | "processing" | "completed" | "error" = "idle"
/**
* Options for the locking module and the task runner to execute the
* task.
*
* - Lock duration is the maximum duration for which to hold the lock.
* After this the lock will be removed.
*
* - Task runner is the implementation function to execute a task.
* Orchestrator has no inbuilt execution logic and it relies on
* the task runner for the same.
*
* The entity is provided to the taskRunner only when the orchestrator
* is able to acquire a lock.
*/
#options: {
lockDuration: number
taskRunner: (
entity: IndexTypes.SchemaObjectEntityRepresentation
) => Promise<void>
}
/**
* Index of the entity that is currently getting processed.
*/
#currentIndex: number = 0
/**
* Collection of entities to process in sequence. A lock is obtained
* while an entity is getting synced to avoid multiple processes
* from syncing the same entity
*/
#entities: IndexTypes.SchemaObjectEntityRepresentation[] = []
/**
* The current state of the orchestrator
*/
get state() {
return this.#state
}
/**
* Reference to the currently processed entity
*/
get current() {
return this.#entities[this.#currentIndex]
}
/**
* Reference to the number of entities left for processing
*/
get remainingCount() {
return this.#entities.length - (this.#currentIndex + 1)
}
constructor(
container: MedusaContainer,
entities: IndexTypes.SchemaObjectEntityRepresentation[],
options: {
lockDuration: number
taskRunner: (
entity: IndexTypes.SchemaObjectEntityRepresentation
) => Promise<void>
}
) {
this.#lockingModule = container.resolve("locking")
this.#entities = entities
this.#options = options
}
/**
* Acquires using the lock module.
*/
async #acquireLock(forKey: string): Promise<boolean> {
try {
await this.#lockingModule.acquire(forKey, {
expire: this.#options.lockDuration,
ownerId: this.#lockingOwner,
})
return true
} catch {
return false
}
}
/**
* Processes the entity at a given index. If there are no entities
* left, the orchestrator state will be set to completed.
*/
async #processAtIndex(index: number) {
const entity = this.#entities[index]
if (!entity) {
this.#state = "completed"
return
}
this.#currentIndex = index
const lockAcquired = await this.#acquireLock(entity.entity)
if (lockAcquired) {
try {
await this.#options.taskRunner(entity)
} catch (error) {
this.#state = "error"
throw error
} finally {
await this.#lockingModule.release(entity.entity, {
ownerId: this.#lockingOwner,
})
}
}
return this.#processAtIndex(index + 1)
}
/**
* Run the orchestrator to process the entities one by one.
*/
async process() {
if (this.state !== "idle") {
throw new Error("Cannot re-run an already running orchestrator instance")
}
this.#state = "processing"
return this.#processAtIndex(0)
}
}