fix: workflow async concurrency (#13769)

* executeAsync

* || 1

* wip

* stepId

* stepId

* wip

* wip

* continue versioning management changes

* fix and improve concurrency

* update in memory engine

* remove duplicated test

* fix script

* Create weak-drinks-confess.md

* fixes

* fix

* fix

* continuation

* centralize merge checkepoint

* centralize merge checkpoint

* fix locking

* rm only

* Continue improvements and fixes

* fixes

* fixes

* hasAwaiting will be recomputed

* fix orchestrator engine

* bump version on async parallel steps only

* mark as delivered fix

* changeset

* check partitions

* avoid saving when having parent step

* cart test

---------

Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-10-20 15:29:19 +02:00
committed by GitHub
parent d97a60d3c1
commit 516f5a3896
31 changed files with 2712 additions and 1406 deletions

View File

@@ -100,7 +100,7 @@ type Subscribers = Map<WorkflowId, TransactionSubscribers>
const AnySubscriber = "any"
export class WorkflowOrchestratorService {
private subscribers: Subscribers = new Map()
private static subscribers: Subscribers = new Map()
private container_: MedusaContainer
private inMemoryDistributedTransactionStorage_: InMemoryDistributedTransactionStorage
readonly #logger: Logger
@@ -110,7 +110,6 @@ export class WorkflowOrchestratorService {
sharedContainer,
}: {
inMemoryDistributedTransactionStorage: InMemoryDistributedTransactionStorage
workflowOrchestratorService: WorkflowOrchestratorService
sharedContainer: MedusaContainer
}) {
this.container_ = sharedContainer
@@ -133,9 +132,18 @@ export class WorkflowOrchestratorService {
await this.inMemoryDistributedTransactionStorage_.onApplicationShutdown()
}
private async triggerParentStep(transaction, result) {
private async triggerParentStep(transaction, result, errors) {
const metadata = transaction.flow.metadata
const { parentStepIdempotencyKey } = metadata ?? {}
const { parentStepIdempotencyKey, cancelingFromParentStep } = metadata ?? {}
if (cancelingFromParentStep) {
/**
* If the sub workflow is cancelling from a parent step, we don't want to trigger the parent
* step.
*/
return
}
if (parentStepIdempotencyKey) {
const hasFailed = [
TransactionState.REVERTED,
@@ -145,12 +153,18 @@ export class WorkflowOrchestratorService {
if (hasFailed) {
await this.setStepFailure({
idempotencyKey: parentStepIdempotencyKey,
stepResponse: result,
stepResponse: errors,
options: {
logOnError: true,
},
})
} else {
await this.setStepSuccess({
idempotencyKey: parentStepIdempotencyKey,
stepResponse: result,
options: {
logOnError: true,
},
})
}
}
@@ -237,7 +251,7 @@ export class WorkflowOrchestratorService {
errors,
})
await this.triggerParentStep(ret.transaction, result)
await this.triggerParentStep(ret.transaction, result, errors)
}
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
@@ -349,7 +363,7 @@ export class WorkflowOrchestratorService {
errors,
})
await this.triggerParentStep(ret.transaction, result)
await this.triggerParentStep(ret.transaction, result, errors)
}
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
@@ -443,7 +457,7 @@ export class WorkflowOrchestratorService {
errors,
})
await this.triggerParentStep(ret.transaction, result)
await this.triggerParentStep(ret.transaction, result, errors)
}
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
@@ -514,7 +528,7 @@ export class WorkflowOrchestratorService {
errors,
})
await this.triggerParentStep(ret.transaction, result)
await this.triggerParentStep(ret.transaction, result, errors)
}
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
@@ -587,7 +601,7 @@ export class WorkflowOrchestratorService {
errors,
})
await this.triggerParentStep(ret.transaction, result)
await this.triggerParentStep(ret.transaction, result, errors)
}
if (throwOnError && (ret.thrownError || ret.errors?.length)) {
@@ -608,7 +622,8 @@ export class WorkflowOrchestratorService {
subscriberId,
}: SubscribeOptions) {
subscriber._id = subscriberId
const subscribers = this.subscribers.get(workflowId) ?? new Map()
const subscribers =
WorkflowOrchestratorService.subscribers.get(workflowId) ?? new Map()
const handlerIndex = (handlers) => {
return handlers.findIndex(
@@ -625,7 +640,7 @@ export class WorkflowOrchestratorService {
transactionSubscribers.push(subscriber)
subscribers.set(transactionId, transactionSubscribers)
this.subscribers.set(workflowId, subscribers)
WorkflowOrchestratorService.subscribers.set(workflowId, subscribers)
return
}
@@ -637,7 +652,7 @@ export class WorkflowOrchestratorService {
workflowSubscribers.push(subscriber)
subscribers.set(AnySubscriber, workflowSubscribers)
this.subscribers.set(workflowId, subscribers)
WorkflowOrchestratorService.subscribers.set(workflowId, subscribers)
}
unsubscribe({
@@ -645,7 +660,8 @@ export class WorkflowOrchestratorService {
transactionId,
subscriberOrId,
}: UnsubscribeOptions) {
const subscribers = this.subscribers.get(workflowId) ?? new Map()
const subscribers =
WorkflowOrchestratorService.subscribers.get(workflowId) ?? new Map()
const filterSubscribers = (handlers: SubscriberHandler[]) => {
return handlers.filter((handler) => {
@@ -665,7 +681,7 @@ export class WorkflowOrchestratorService {
} else {
subscribers.delete(transactionId)
}
this.subscribers.set(workflowId, subscribers)
WorkflowOrchestratorService.subscribers.set(workflowId, subscribers)
return
}
@@ -676,7 +692,7 @@ export class WorkflowOrchestratorService {
} else {
subscribers.delete(AnySubscriber)
}
this.subscribers.set(workflowId, subscribers)
WorkflowOrchestratorService.subscribers.set(workflowId, subscribers)
}
private notify(options: NotifyOptions) {
@@ -687,7 +703,7 @@ export class WorkflowOrchestratorService {
private async processSubscriberNotifications(options: NotifyOptions) {
const { workflowId, transactionId, eventType } = options
const subscribers: TransactionSubscribers =
this.subscribers.get(workflowId) ?? new Map()
WorkflowOrchestratorService.subscribers.get(workflowId) ?? new Map()
const notifySubscribersAsync = async (handlers: SubscriberHandler[]) => {
const promises = handlers.map(async (handler) => {

View File

@@ -20,10 +20,10 @@ import {
ModulesSdkTypes,
} from "@medusajs/framework/types"
import {
isPresent,
MedusaError,
TransactionState,
TransactionStepState,
isPresent,
} from "@medusajs/framework/utils"
import { WorkflowOrchestratorService } from "@services"
import { type CronExpression, parseExpression } from "cron-parser"
@@ -31,6 +31,23 @@ import { WorkflowExecution } from "../models/workflow-execution"
const THIRTY_MINUTES_IN_MS = 1000 * 60 * 30
const doneStates = [
TransactionStepState.DONE,
TransactionStepState.REVERTED,
TransactionStepState.FAILED,
TransactionStepState.SKIPPED,
TransactionStepState.SKIPPED_FAILURE,
TransactionStepState.TIMEOUT,
]
const finishedStates = [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.REVERTED,
]
const failedStates = [TransactionState.FAILED, TransactionState.REVERTED]
function calculateDelayFromExpression(expression: CronExpression): number {
const nextTime = expression.next().getTime()
const now = Date.now()
@@ -71,24 +88,6 @@ function parseNextExecution(
return result
}
const invokingStatesSet = new Set([
TransactionStepState.INVOKING,
TransactionStepState.NOT_STARTED,
])
const compensatingStatesSet = new Set([
TransactionStepState.COMPENSATING,
TransactionStepState.NOT_STARTED,
])
function isInvokingState(step: TransactionStep) {
return invokingStatesSet.has(step.invoke?.state)
}
function isCompensatingState(step: TransactionStep) {
return compensatingStatesSet.has(step.compensate?.state)
}
export class InMemoryDistributedTransactionStorage
implements IDistributedTransactionStorage, IDistributedSchedulerStorage
{
@@ -96,8 +95,7 @@ export class InMemoryDistributedTransactionStorage
private logger_: Logger
private workflowOrchestratorService_: WorkflowOrchestratorService
private storage: Map<string, Omit<TransactionCheckpoint, "context">> =
new Map()
private storage: Record<string, TransactionCheckpoint> = {}
private scheduled: Map<
string,
{
@@ -112,6 +110,7 @@ export class InMemoryDistributedTransactionStorage
private pendingTimers: Set<NodeJS.Timeout> = new Set()
private clearTimeout_: NodeJS.Timeout
private isLocked: Map<string, boolean> = new Map()
constructor({
workflowExecutionService,
@@ -179,29 +178,11 @@ export class InMemoryDistributedTransactionStorage
private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) {
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const isFinished = [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.REVERTED,
].includes(data.flow.state)
const asyncVersion = data.flow._v
const isFinished = finishedStates.includes(data.flow.state)
const isWaitingToCompensate =
data.flow.state === TransactionState.WAITING_TO_COMPENSATE
/**
* Bit of explanation:
*
* When a workflow run, it run all sync step in memory until it reaches a async step.
* In that case, it might handover to another process to continue the execution. Thats why
* we need to save the current state of the flow. Then from there, it will run again all
* sync steps until the next async step. an so on so forth.
*
* To summarize, we only trully need to save the data when we are reaching any steps that
* trigger a handover to a potential other process.
*
* This allows us to spare some resources and time by not over communicating with the external
* database when it is not really needed
*/
const isFlowInvoking = data.flow.state === TransactionState.INVOKING
const stepsArray = Object.values(data.flow.steps) as TransactionStep[]
@@ -240,7 +221,8 @@ export class InMemoryDistributedTransactionStorage
if (
!(isNotStarted || isFinished || isWaitingToCompensate) &&
!currentStepsIsAsync
!currentStepsIsAsync &&
!asyncVersion
) {
return
}
@@ -295,15 +277,14 @@ export class InMemoryDistributedTransactionStorage
.catch(() => undefined)
if (trx) {
const { flow, errors } = this.storage.get(key) ?? {}
const { flow, errors } = this.storage[key]
? JSON.parse(JSON.stringify(this.storage[key]))
: {}
const { idempotent } = options ?? {}
const execution = trx.execution as TransactionFlow
if (!idempotent) {
const isFailedOrReverted = [
TransactionState.REVERTED,
TransactionState.FAILED,
].includes(execution.state)
const isFailedOrReverted = failedStates.includes(execution.state)
const isDone = execution.state === TransactionState.DONE
@@ -321,11 +302,11 @@ export class InMemoryDistributedTransactionStorage
}
}
return {
flow: flow ?? (trx.execution as TransactionFlow),
context: trx.context?.data as TransactionContext,
errors: errors ?? (trx.context?.errors as TransactionStepError[]),
}
return new TransactionCheckpoint(
flow ?? (trx?.execution as TransactionFlow),
trx?.context?.data as TransactionContext,
errors ?? (trx?.context?.errors as TransactionStepError[])
)
}
return
@@ -336,68 +317,94 @@ export class InMemoryDistributedTransactionStorage
data: TransactionCheckpoint,
ttl?: number,
options?: TransactionOptions
): Promise<void> {
/**
* Store the retention time only if the transaction is done, failed or reverted.
* From that moment, this tuple can be later on archived or deleted after the retention time.
*/
const hasFinished = [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.REVERTED,
].includes(data.flow.state)
const { retentionTime } = options ?? {}
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
// Only store retention time if it's provided
if (retentionTime) {
Object.assign(data, {
retention_time: retentionTime,
})
): Promise<TransactionCheckpoint> {
if (this.isLocked.has(key)) {
throw new Error("Transaction storage is locked")
}
// Store in memory
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const isManualTransactionId = !data.flow.transactionId.startsWith("auto-")
this.isLocked.set(key, true)
if (isNotStarted && isManualTransactionId) {
const storedData = this.storage.get(key)
if (storedData) {
throw new SkipExecutionError(
"Transaction already started for transactionId: " +
data.flow.transactionId
)
try {
/**
* Store the retention time only if the transaction is done, failed or reverted.
* From that moment, this tuple can be later on archived or deleted after the retention time.
*/
const { retentionTime } = options ?? {}
const hasFinished = finishedStates.includes(data.flow.state)
let cachedCheckpoint: TransactionCheckpoint | undefined
const getCheckpoint = async (options?: TransactionOptions) => {
if (!cachedCheckpoint) {
cachedCheckpoint = await this.get(key, options)
}
return cachedCheckpoint
}
}
const { flow, errors } = data
this.storage.set(key, {
flow,
errors,
})
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
getCheckpoint,
})
// Optimize DB operations - only perform when necessary
if (hasFinished) {
if (!retentionTime) {
// If the workflow is nested, we cant just remove it because it would break the compensation algorithm. Instead, it will get deleted when the top level parent is deleted.
if (!flow.metadata?.parentStepIdempotencyKey) {
await this.deleteFromDb(data)
// Only store retention time if it's provided
if (retentionTime) {
Object.assign(data, {
retention_time: retentionTime,
})
}
// Store in memory
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const isManualTransactionId = !data.flow.transactionId.startsWith("auto-")
if (isNotStarted && isManualTransactionId) {
const storedData = this.storage[key]
if (storedData) {
throw new SkipExecutionError(
"Transaction already started for transactionId: " +
data.flow.transactionId
)
}
}
if (data.flow._v) {
const storedData = await this.get(key, {
isCancelling: !!data.flow.cancelledAt,
} as any)
TransactionCheckpoint.mergeCheckpoints(data, storedData)
}
const { flow, errors } = data
this.storage[key] = {
flow,
context: {} as TransactionContext,
errors,
} as TransactionCheckpoint
// Optimize DB operations - only perform when necessary
if (hasFinished) {
if (!retentionTime) {
if (!flow.metadata?.parentStepIdempotencyKey) {
await this.deleteFromDb(data)
} else {
await this.saveToDb(data, retentionTime)
}
} else {
await this.saveToDb(data, retentionTime)
}
delete this.storage[key]
} else {
await this.saveToDb(data, retentionTime)
}
this.storage.delete(key)
} else {
await this.saveToDb(data, retentionTime)
return data
} finally {
this.isLocked.delete(key)
}
}
@@ -405,28 +412,25 @@ export class InMemoryDistributedTransactionStorage
data,
key,
options,
getCheckpoint,
}: {
data: TransactionCheckpoint
key: string
options?: TransactionOptions
getCheckpoint: (
options: TransactionOptions
) => Promise<TransactionCheckpoint | undefined>
}) {
// TODO: comment, we have been able to try to replace this entire function
// with a locking first approach. We might come back to that another time.
// This remove the necessity of all the below logic to prevent race conditions
// by preventing the exact same execution to run at the same time.
// See early commits from: https://github.com/medusajs/medusa/pull/13345/commits
const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes(
data.flow.state
)
/**
* In case many execution can succeed simultaneously, we need to ensure that the latest
* execution does continue if a previous execution is considered finished
*/
const currentFlow = data.flow
const rawData = this.storage.get(key)
const rawData = this.storage[key]
let data_ = {} as TransactionCheckpoint
if (rawData) {
data_ = rawData as TransactionCheckpoint
@@ -437,13 +441,37 @@ export class InMemoryDistributedTransactionStorage
} as Parameters<typeof this.get>[1]
data_ =
(await this.get(key, getOptions)) ??
(await getCheckpoint(getOptions as TransactionOptions)) ??
({ flow: {} } as TransactionCheckpoint)
}
const { flow: latestUpdatedFlow } = data_
if (options?.stepId) {
const stepId = options.stepId
const currentStep = data.flow.steps[stepId]
const latestStep = latestUpdatedFlow.steps?.[stepId]
if (latestStep && currentStep) {
const isCompensating = data.flow.state === TransactionState.COMPENSATING
if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) {
const latestState = isCompensating
? latestStep.compensate?.state
: latestStep.invoke?.state
const shouldSkip = doneStates.includes(latestState)
if (shouldSkip) {
throw new SkipStepAlreadyFinishedError(
`Step ${stepId} already finished by another execution`
)
}
}
}
if (
!isInitialCheckpoint &&
!isPresent(latestUpdatedFlow) &&
!data.flow.metadata?.parentStepIdempotencyKey
) {
/**
* the initial checkpoint expect no other checkpoint to have been stored.
* In case it is not the initial one and another checkpoint is trying to
@@ -453,54 +481,7 @@ export class InMemoryDistributedTransactionStorage
throw new SkipExecutionError("Already finished by another execution")
}
let currentFlowLatestExecutedStep: TransactionStep | undefined
const currentFlowSteps = Object.values(currentFlow.steps || {})
for (let i = currentFlowSteps.length - 1; i >= 0; i--) {
if (currentFlowSteps[i].lastAttempt) {
currentFlowLatestExecutedStep = currentFlowSteps[i]
break
}
}
let latestUpdatedFlowLatestExecutedStep: TransactionStep | undefined
const latestUpdatedFlowSteps = Object.values(latestUpdatedFlow.steps || {})
for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) {
if (latestUpdatedFlowSteps[i].lastAttempt) {
latestUpdatedFlowLatestExecutedStep = latestUpdatedFlowSteps[i]
break
}
}
/**
* The current flow and the latest updated flow have the same latest executed step.
*/
const isSameLatestExecutedStep =
currentFlowLatestExecutedStep &&
latestUpdatedFlowLatestExecutedStep &&
currentFlowLatestExecutedStep?.id ===
latestUpdatedFlowLatestExecutedStep?.id
/**
* The current flow's latest executed step has a last attempt ahead of the latest updated
* flow's latest executed step. Therefor it is fine, otherwise another execution has already
* finished the step.
*/
const isCurrentLatestExecutedStepLastAttemptAhead =
currentFlowLatestExecutedStep?.lastAttempt &&
latestUpdatedFlowLatestExecutedStep?.lastAttempt &&
currentFlowLatestExecutedStep.lastAttempt >=
latestUpdatedFlowLatestExecutedStep.lastAttempt
if (
isSameLatestExecutedStep &&
!isCurrentLatestExecutedStepLastAttemptAhead
) {
throw new SkipStepAlreadyFinishedError(
"Step already in execution ahead of the current one"
)
}
// First ensure that the latest execution was not cancelled, otherwise we skip the execution
// Ensure that the latest execution was not cancelled, otherwise we skip the execution
const latestTransactionCancelledAt = latestUpdatedFlow.cancelledAt
const currentTransactionCancelledAt = currentFlow.cancelledAt
@@ -512,86 +493,6 @@ export class InMemoryDistributedTransactionStorage
"Workflow execution has been cancelled during the execution"
)
}
const currentFlowLastInvokingStepIndex =
currentFlowSteps.findIndex(isInvokingState)
let latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps
? 1 // There is no other execution, so the current execution is the latest
: -1
if (latestUpdatedFlow.steps) {
for (let i = 0; i < latestUpdatedFlowSteps.length; i++) {
if (isInvokingState(latestUpdatedFlowSteps[i])) {
latestUpdatedFlowLastInvokingStepIndex = i
break
}
}
}
let currentFlowLastCompensatingStepIndex = -1
for (let i = currentFlowSteps.length - 1; i >= 0; i--) {
if (isCompensatingState(currentFlowSteps[i])) {
currentFlowLastCompensatingStepIndex = currentFlowSteps.length - 1 - i
break
}
}
let latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps
? -1 // There is no other execution, so the current execution is the latest
: -1
if (latestUpdatedFlow.steps) {
for (let i = latestUpdatedFlowSteps.length - 1; i >= 0; i--) {
if (isCompensatingState(latestUpdatedFlowSteps[i])) {
latestUpdatedFlowLastCompensatingStepIndex =
latestUpdatedFlowSteps.length - 1 - i
break
}
}
}
const isLatestExecutionFinishedIndex = -1
const invokeShouldBeSkipped =
(latestUpdatedFlowLastInvokingStepIndex ===
isLatestExecutionFinishedIndex ||
currentFlowLastInvokingStepIndex <
latestUpdatedFlowLastInvokingStepIndex) &&
currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex
const compensateShouldBeSkipped =
currentFlowLastCompensatingStepIndex <
latestUpdatedFlowLastCompensatingStepIndex &&
currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex &&
latestUpdatedFlowLastCompensatingStepIndex !==
isLatestExecutionFinishedIndex
const isCompensatingMismatch =
latestUpdatedFlow.state === TransactionState.COMPENSATING &&
![TransactionState.REVERTED, TransactionState.FAILED].includes(
currentFlow.state
) &&
currentFlow.state !== latestUpdatedFlow.state
const isRevertedMismatch =
latestUpdatedFlow.state === TransactionState.REVERTED &&
currentFlow.state !== TransactionState.REVERTED
const isFailedMismatch =
latestUpdatedFlow.state === TransactionState.FAILED &&
currentFlow.state !== TransactionState.FAILED
if (
(data.flow.state !== TransactionState.COMPENSATING &&
invokeShouldBeSkipped) ||
(data.flow.state === TransactionState.COMPENSATING &&
compensateShouldBeSkipped) ||
isCompensatingMismatch ||
isRevertedMismatch ||
isFailedMismatch
) {
throw new SkipExecutionError("Already finished by another execution")
}
}
async scheduleRetry(
@@ -850,7 +751,7 @@ export class InMemoryDistributedTransactionStorage
updated_at: {
$lte: raw(
(alias) =>
`CURRENT_TIMESTAMP - (INTERVAL '1 second' * retention_time)`
`CURRENT_TIMESTAMP - (INTERVAL '1 second' * "retention_time")`
),
},
state: {