chore(index): Few adjustments (#12557)
**What** - Adjust lock duration, it is in seconds and not in ms - Log on lock release - Renew lock separately from the other promises - Add more logs - Log when lock can't be acquired. It can be expected in case two processes try to sync the same entity and in that case it can be ignored. But at least it gives some information in case it happens for another reason - Log when the release lock failed, the lock will remain in the locking provider for 1 minute before being removed. But it wont prevent other entities to be synced
This commit is contained in:
committed by
GitHub
parent
22687d694e
commit
499381d119
5
.changeset/proud-carrots-punch.md
Normal file
5
.changeset/proud-carrots-punch.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@medusajs/index": patch
|
||||
---
|
||||
|
||||
chore(index): Few adjustments
|
||||
@@ -1,5 +1,5 @@
|
||||
import { asValue } from "awilix"
|
||||
import { container } from "@medusajs/framework"
|
||||
import { container, logger } from "@medusajs/framework"
|
||||
import type { IndexTypes } from "@medusajs/types"
|
||||
import { Orchestrator } from "@utils"
|
||||
|
||||
@@ -12,7 +12,7 @@ function creatingFakeLockingModule() {
|
||||
}
|
||||
this.lockEntities.add(key)
|
||||
},
|
||||
release(key: string) {
|
||||
async release(key: string) {
|
||||
this.lockEntities.delete(key)
|
||||
},
|
||||
}
|
||||
@@ -55,6 +55,7 @@ describe("Orchestrator", () => {
|
||||
entities.map((e) => e.entity),
|
||||
{
|
||||
lockDuration: 60 * 1000,
|
||||
logger: logger,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -101,6 +102,7 @@ describe("Orchestrator", () => {
|
||||
entities.map((e) => e.entity),
|
||||
{
|
||||
lockDuration: 60 * 1000,
|
||||
logger: logger,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -150,6 +152,7 @@ describe("Orchestrator", () => {
|
||||
entityNames,
|
||||
{
|
||||
lockDuration: 60 * 1000,
|
||||
logger: logger,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -162,6 +165,7 @@ describe("Orchestrator", () => {
|
||||
entityNames,
|
||||
{
|
||||
lockDuration: 60 * 1000,
|
||||
logger: logger,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -169,16 +173,18 @@ describe("Orchestrator", () => {
|
||||
orchestrator.process(taskRunner),
|
||||
orchestrator1.process(taskRunner2),
|
||||
])
|
||||
expect(processedEntities).toEqual([
|
||||
{
|
||||
entity: "brand",
|
||||
owner: "instance-1",
|
||||
},
|
||||
{
|
||||
entity: "product",
|
||||
owner: "instance-2",
|
||||
},
|
||||
])
|
||||
expect(processedEntities).toEqual(
|
||||
expect.arrayContaining([
|
||||
{
|
||||
entity: "brand",
|
||||
owner: "instance-1",
|
||||
},
|
||||
{
|
||||
entity: "product",
|
||||
owner: "instance-2",
|
||||
},
|
||||
])
|
||||
)
|
||||
expect(lockingModule.lockEntities.size).toEqual(0)
|
||||
})
|
||||
|
||||
@@ -221,6 +227,7 @@ describe("Orchestrator", () => {
|
||||
entities.map((e) => e.entity),
|
||||
{
|
||||
lockDuration: 60 * 1000,
|
||||
logger: logger,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -269,6 +276,7 @@ describe("Orchestrator", () => {
|
||||
entities.map((e) => e.entity),
|
||||
{
|
||||
lockDuration: 60 * 1000,
|
||||
logger: logger,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -85,12 +85,13 @@ export class DataSynchronizer {
|
||||
fields: string
|
||||
fields_hash: string
|
||||
}[],
|
||||
lockDuration: number = 1000 * 60 * 5
|
||||
lockDuration: number = 60 // 1 minute
|
||||
) {
|
||||
this.#isReadyOrThrow()
|
||||
const entitiesToSync = entities.map((entity) => entity.entity)
|
||||
this.#orchestrator = new Orchestrator(this.#locking, entitiesToSync, {
|
||||
lockDuration,
|
||||
logger: this.#logger,
|
||||
})
|
||||
await this.#orchestrator.process(this.#taskRunner.bind(this))
|
||||
}
|
||||
@@ -156,26 +157,23 @@ export class DataSynchronizer {
|
||||
ack: async (ack) => {
|
||||
const endTime = performance.now()
|
||||
const chunkElapsedTime = (endTime - chunkStartTime).toFixed(2)
|
||||
const promises: Promise<any>[] = []
|
||||
|
||||
if (ack.lastCursor) {
|
||||
this.#logger.debug(
|
||||
`[Index engine] syncing entity '${entity}' updating last cursor to ${ack.lastCursor} (+${chunkElapsedTime}ms)`
|
||||
)
|
||||
|
||||
promises.push(
|
||||
this.#indexSyncService.update({
|
||||
data: {
|
||||
last_key: ack.lastCursor,
|
||||
},
|
||||
selector: {
|
||||
entity,
|
||||
},
|
||||
})
|
||||
)
|
||||
await this.#indexSyncService.update({
|
||||
data: {
|
||||
last_key: ack.lastCursor,
|
||||
},
|
||||
selector: {
|
||||
entity,
|
||||
},
|
||||
})
|
||||
|
||||
if (!ack.done && !ack.err) {
|
||||
promises.push(this.#orchestrator.renewLock(entity))
|
||||
await this.#orchestrator.renewLock(entity)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,8 +190,6 @@ export class DataSynchronizer {
|
||||
)
|
||||
}
|
||||
|
||||
await promiseAll(promises)
|
||||
|
||||
chunkStartTime = performance.now()
|
||||
},
|
||||
})
|
||||
|
||||
@@ -139,7 +139,11 @@ export class Configuration {
|
||||
}
|
||||
|
||||
const changes = await this.#indexMetadataService.list({
|
||||
status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING],
|
||||
status: [
|
||||
IndexMetadataStatus.PENDING,
|
||||
IndexMetadataStatus.PROCESSING,
|
||||
IndexMetadataStatus.ERROR,
|
||||
],
|
||||
})
|
||||
|
||||
this.#logger.info(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { ILockingModule } from "@medusajs/types"
|
||||
import { ILockingModule, Logger } from "@medusajs/types"
|
||||
|
||||
export class Orchestrator {
|
||||
/**
|
||||
@@ -6,6 +6,11 @@ export class Orchestrator {
|
||||
*/
|
||||
#lockingModule: ILockingModule
|
||||
|
||||
/**
|
||||
* Reference to the logger
|
||||
*/
|
||||
#logger: Logger
|
||||
|
||||
/**
|
||||
* Owner id when acquiring locks
|
||||
*/
|
||||
@@ -74,11 +79,13 @@ export class Orchestrator {
|
||||
entities: string[],
|
||||
options: {
|
||||
lockDuration: number
|
||||
logger: Logger
|
||||
}
|
||||
) {
|
||||
this.#lockingModule = lockingModule
|
||||
this.#entities = entities
|
||||
this.#options = options
|
||||
this.#logger = options.logger
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -104,14 +111,14 @@ export class Orchestrator {
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes the entity at a given index. If there are no entities
|
||||
* Processes the entity. If there are no entities
|
||||
* left, the orchestrator state will be set to completed.
|
||||
*
|
||||
* - Task runner is the implementation function to execute a task.
|
||||
* Orchestrator has no inbuilt execution logic and it relies on
|
||||
* the task runner for the same.
|
||||
*/
|
||||
async #processAtIndex(
|
||||
async #processEntity(
|
||||
taskRunner: (entity: string) => Promise<void>,
|
||||
entity: string
|
||||
) {
|
||||
@@ -123,10 +130,20 @@ export class Orchestrator {
|
||||
this.#state = "error"
|
||||
throw error
|
||||
} finally {
|
||||
await this.#lockingModule.release(entity, {
|
||||
ownerId: this.#lockingOwner,
|
||||
})
|
||||
await this.#lockingModule
|
||||
.release(entity, {
|
||||
ownerId: this.#lockingOwner,
|
||||
})
|
||||
.catch(() => {
|
||||
this.#logger.error(
|
||||
`[Index engine] failed to release lock for entity '${entity}'`
|
||||
)
|
||||
})
|
||||
}
|
||||
} else {
|
||||
this.#logger.warn(
|
||||
`[Index engine] failed to acquire lock for entity '${entity}' on pid ${process.pid}. It means another process is already processing this entity or a lock is still present in your locking provider.`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,7 +169,7 @@ export class Orchestrator {
|
||||
break
|
||||
}
|
||||
|
||||
await this.#processAtIndex(taskRunner, entity)
|
||||
await this.#processEntity(taskRunner, entity)
|
||||
}
|
||||
|
||||
this.#state = "completed"
|
||||
|
||||
Reference in New Issue
Block a user