Chore/orchestration storage improvements (#12178)

**What**
Cleanup and improve workflow storage utility
This commit is contained in:
Adrien de Peretti
2025-04-18 10:35:23 +02:00
committed by GitHub
parent fe74e77a7a
commit 28958b2e26
3 changed files with 228 additions and 164 deletions

View File

@@ -173,19 +173,27 @@ export class InMemoryDistributedTransactionStorage
options,
})
Object.assign(data, {
retention_time: retentionTime,
})
this.storage.set(key, data)
if (hasFinished && !retentionTime && !idempotent) {
await this.deleteFromDb(data)
} else {
await this.saveToDb(data, retentionTime)
// Only store retention time if it's provided
if (retentionTime) {
Object.assign(data, {
retention_time: retentionTime,
})
}
// Store in memory
this.storage.set(key, data)
// Optimize DB operations - only perform when necessary
if (hasFinished) {
if (!retentionTime && !idempotent) {
await this.deleteFromDb(data)
} else {
await this.saveToDb(data, retentionTime)
}
this.storage.delete(key)
} else {
await this.saveToDb(data, retentionTime)
}
}
@@ -198,11 +206,7 @@ export class InMemoryDistributedTransactionStorage
key: string
options?: TransactionOptions
}) {
let isInitialCheckpoint = false
if (data.flow.state === TransactionState.NOT_STARTED) {
isInitialCheckpoint = true
}
const isInitialCheckpoint = data.flow.state === TransactionState.NOT_STARTED
/**
* In case many execution can succeed simultaneously, we need to ensure that the latest
@@ -223,49 +227,45 @@ export class InMemoryDistributedTransactionStorage
throw new SkipExecutionError("Already finished by another execution")
}
const currentFlowLastInvokingStepIndex = Object.values(
currentFlow.steps
).findIndex((step) => {
return [
TransactionStepState.INVOKING,
TransactionStepState.NOT_STARTED,
].includes(step.invoke?.state)
})
const currentFlowSteps = Object.values(currentFlow.steps || {})
const latestUpdatedFlowSteps = latestUpdatedFlow.steps
? Object.values(
latestUpdatedFlow.steps as Record<string, TransactionStep>
)
: []
// Predefined states for quick lookup
const invokingStates = [
TransactionStepState.INVOKING,
TransactionStepState.NOT_STARTED,
]
const compensatingStates = [
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
]
const isInvokingState = (step: TransactionStep) =>
invokingStates.includes(step.invoke?.state)
const isCompensatingState = (step: TransactionStep) =>
compensatingStates.includes(step.compensate?.state)
const currentFlowLastInvokingStepIndex =
currentFlowSteps.findIndex(isInvokingState)
const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps
? 1 // There is no other execution, so the current execution is the latest
: Object.values(
(latestUpdatedFlow.steps as Record<string, TransactionStep>) ?? {}
).findIndex((step) => {
return [
TransactionStepState.INVOKING,
TransactionStepState.NOT_STARTED,
].includes(step.invoke?.state)
})
: latestUpdatedFlowSteps.findIndex(isInvokingState)
const currentFlowLastCompensatingStepIndex = Object.values(
currentFlow.steps
)
.reverse()
.findIndex((step) => {
return [
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
].includes(step.compensate?.state)
})
const reversedCurrentFlowSteps = [...currentFlowSteps].reverse()
const currentFlowLastCompensatingStepIndex =
reversedCurrentFlowSteps.findIndex(isCompensatingState)
const reversedLatestUpdatedFlowSteps = [...latestUpdatedFlowSteps].reverse()
const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps
? -1 // There is no other execution, so the current execution is the latest
: Object.values(
(latestUpdatedFlow.steps as Record<string, TransactionStep>) ?? {}
)
.reverse()
.findIndex((step) => {
return [
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
].includes(step.compensate?.state)
})
: reversedLatestUpdatedFlowSteps.findIndex(isCompensatingState)
const isLatestExecutionFinishedIndex = -1
const invokeShouldBeSkipped =
@@ -282,20 +282,29 @@ export class InMemoryDistributedTransactionStorage
latestUpdatedFlowLastCompensatingStepIndex !==
isLatestExecutionFinishedIndex
const isCompensatingMismatch =
latestUpdatedFlow.state === TransactionState.COMPENSATING &&
![TransactionState.REVERTED, TransactionState.FAILED].includes(
currentFlow.state
) &&
currentFlow.state !== latestUpdatedFlow.state
const isRevertedMismatch =
latestUpdatedFlow.state === TransactionState.REVERTED &&
currentFlow.state !== TransactionState.REVERTED
const isFailedMismatch =
latestUpdatedFlow.state === TransactionState.FAILED &&
currentFlow.state !== TransactionState.FAILED
if (
(data.flow.state !== TransactionState.COMPENSATING &&
invokeShouldBeSkipped) ||
(data.flow.state === TransactionState.COMPENSATING &&
compensateShouldBeSkipped) ||
(latestUpdatedFlow.state === TransactionState.COMPENSATING &&
![TransactionState.REVERTED, TransactionState.FAILED].includes(
currentFlow.state
) &&
currentFlow.state !== latestUpdatedFlow.state) ||
(latestUpdatedFlow.state === TransactionState.REVERTED &&
currentFlow.state !== TransactionState.REVERTED) ||
(latestUpdatedFlow.state === TransactionState.FAILED &&
currentFlow.state !== TransactionState.FAILED)
isCompensatingMismatch ||
isRevertedMismatch ||
isFailedMismatch
) {
throw new SkipExecutionError("Already finished by another execution")
}
@@ -428,12 +437,13 @@ export class InMemoryDistributedTransactionStorage
typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId
// In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one
// any only then we add the new one.
await this.remove(jobId)
let expression: CronExpression | number
let nextExecution = parseNextExecution(schedulerOptions)
if ("cron" in schedulerOptions) {
// Cache the parsed expression to avoid repeated parsing
expression = parseExpression(schedulerOptions.cron)
} else if ("interval" in schedulerOptions) {
expression = schedulerOptions.interval
@@ -448,6 +458,9 @@ export class InMemoryDistributedTransactionStorage
this.jobHandler(jobId)
}, nextExecution)
// Set the timer's unref to prevent it from keeping the process alive
timer.unref()
this.scheduled.set(jobId, {
timer,
expression,
@@ -488,23 +501,28 @@ export class InMemoryDistributedTransactionStorage
const nextExecution = parseNextExecution(job.expression)
const timer = setTimeout(async () => {
this.jobHandler(jobId)
}, nextExecution)
this.scheduled.set(jobId, {
timer,
expression: job.expression,
numberOfExecutions: (job.numberOfExecutions ?? 0) + 1,
config: job.config,
})
try {
// With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported.
await this.workflowOrchestratorService_.run(jobId, {
logOnError: true,
throwOnError: false,
})
// Only schedule the next job execution after the current one completes successfully
const timer = setTimeout(async () => {
setImmediate(() => {
this.jobHandler(jobId)
})
}, nextExecution)
// Prevent timer from keeping the process alive
timer.unref()
this.scheduled.set(jobId, {
timer,
expression: job.expression,
numberOfExecutions: (job.numberOfExecutions ?? 0) + 1,
config: job.config,
})
} catch (e) {
if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) {
this.logger_?.warn(