feat: run nested async workflows (#9119)

This commit is contained in:
Carlos R. L. Rodrigues
2024-09-16 10:06:45 -03:00
committed by GitHub
parent 0bcdcccbe2
commit ef8dc4087e
23 changed files with 295 additions and 100 deletions

View File

@@ -12,7 +12,12 @@ import {
Logger,
MedusaContainer,
} from "@medusajs/types"
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
import {
InjectSharedContext,
MedusaContext,
TransactionState,
isString,
} from "@medusajs/utils"
import {
FlowRunOptions,
MedusaWorkflow,
@@ -146,6 +151,29 @@ export class WorkflowOrchestratorService {
await this.redisDistributedTransactionStorage_.onApplicationStart()
}
private async triggerParentStep(transaction, result) {
const metadata = transaction.flow.metadata
const { parentStepIdempotencyKey } = metadata ?? {}
if (parentStepIdempotencyKey) {
const hasFailed = [
TransactionState.REVERTED,
TransactionState.FAILED,
].includes(transaction.flow.state)
if (hasFailed) {
await this.setStepFailure({
idempotencyKey: parentStepIdempotencyKey,
stepResponse: result,
})
} else {
await this.setStepSuccess({
idempotencyKey: parentStepIdempotencyKey,
stepResponse: result,
})
}
}
}
@InjectSharedContext()
async run<T = unknown>(
workflowIdOrWorkflow: string | ReturnWorkflow<any, any, any>,
@@ -180,16 +208,12 @@ export class WorkflowOrchestratorService {
transactionId: context.transactionId,
})
const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId)
const exportedWorkflow = MedusaWorkflow.getWorkflow(workflowId)
if (!exportedWorkflow) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)
const ret = await flow.run({
const ret = await exportedWorkflow.run({
input,
throwOnError,
logOnError,
@@ -198,14 +222,25 @@ export class WorkflowOrchestratorService {
events,
})
// TODO: temporary
const hasFinished = ret.transaction.hasFinished()
const metadata = ret.transaction.getFlow().metadata
const { parentStepIdempotencyKey } = metadata ?? {}
const hasFailed = [
TransactionState.REVERTED,
TransactionState.FAILED,
].includes(ret.transaction.getFlow().state)
const acknowledgement = {
transactionId: context.transactionId,
workflowId: workflowId,
parentStepIdempotencyKey,
hasFinished,
hasFailed,
}
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await this.notify({
eventType: "onFinish",
workflowId,
@@ -213,6 +248,8 @@ export class WorkflowOrchestratorService {
result,
errors,
})
await this.triggerParentStep(ret.transaction, result)
}
return { acknowledgement, ...ret }
@@ -243,12 +280,11 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
const transaction = await exportedWorkflow.getRunningTransaction(
transactionId,
context
)
const transaction = await flow.getRunningTransaction(transactionId, context)
return transaction
}
@@ -282,17 +318,13 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)
const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
transactionId,
workflowId,
})
const ret = await flow.registerStepSuccess({
const ret = await exportedWorkflow.registerStepSuccess({
idempotencyKey: idempotencyKey_,
context,
resultFrom,
@@ -304,6 +336,7 @@ export class WorkflowOrchestratorService {
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await this.notify({
eventType: "onFinish",
workflowId,
@@ -311,6 +344,8 @@ export class WorkflowOrchestratorService {
result,
errors,
})
await this.triggerParentStep(ret.transaction, result)
}
return ret
@@ -368,6 +403,7 @@ export class WorkflowOrchestratorService {
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await this.notify({
eventType: "onFinish",
workflowId,
@@ -375,6 +411,8 @@ export class WorkflowOrchestratorService {
result,
errors,
})
await this.triggerParentStep(ret.transaction, result)
}
return ret

View File

@@ -66,16 +66,16 @@ export class RedisDistributedTransactionStorage
}
async onApplicationStart() {
const allowedJobs = [
JobType.RETRY,
JobType.STEP_TIMEOUT,
JobType.TRANSACTION_TIMEOUT,
]
this.worker = new Worker(
this.queueName,
async (job) => {
const allJobs = [
JobType.RETRY,
JobType.STEP_TIMEOUT,
JobType.TRANSACTION_TIMEOUT,
]
if (allJobs.includes(job.name as JobType)) {
if (allowedJobs.includes(job.name as JobType)) {
await this.executeTransaction(
job.data.workflowId,
job.data.transactionId