fix(workflow-engine-*): subscribe response and error (#6869)

This commit is contained in:
Carlos R. L. Rodrigues
2024-03-29 05:23:41 -03:00
committed by GitHub
parent d97af91a8d
commit e603726985
6 changed files with 67 additions and 43 deletions

View File

@@ -10,6 +10,7 @@ import {
FlowRunOptions,
MedusaWorkflow,
ReturnWorkflow,
resolveValue,
} from "@medusajs/workflows-sdk"
import Redis from "ioredis"
import { ulid } from "ulid"
@@ -512,30 +513,39 @@ export class WorkflowOrchestratorService {
await notify({ eventType: "onStepBegin", step })
},
onStepSuccess: async ({ step, transaction }) => {
const response = transaction.getContext().invoke[step.id]
const stepName = step.definition.action!
const response = await resolveValue(
transaction.getContext().invoke[stepName],
transaction
)
customEventHandlers?.onStepSuccess?.({ step, transaction, response })
await notify({ eventType: "onStepSuccess", step, response })
},
onStepFailure: async ({ step, transaction }) => {
const errors = transaction.getErrors(TransactionHandlerType.INVOKE)[
step.id
]
const stepName = step.definition.action!
const errors = transaction
.getErrors(TransactionHandlerType.INVOKE)
.filter((err) => err.action === stepName)
customEventHandlers?.onStepFailure?.({ step, transaction, errors })
await notify({ eventType: "onStepFailure", step, errors })
},
onCompensateStepSuccess: async ({ step, transaction }) => {
const response = transaction.getContext().compensate[step.id]
const stepName = step.definition.action!
const response = transaction.getContext().compensate[stepName]
customEventHandlers?.onStepSuccess?.({ step, transaction, response })
await notify({ eventType: "onCompensateStepSuccess", step, response })
},
onCompensateStepFailure: async ({ step, transaction }) => {
const errors = transaction.getErrors(TransactionHandlerType.COMPENSATE)[
step.id
]
const stepName = step.definition.action!
const errors = transaction
.getErrors(TransactionHandlerType.COMPENSATE)
.filter((err) => err.action === stepName)
customEventHandlers?.onStepFailure?.({ step, transaction, errors })
await notify({ eventType: "onCompensateStepFailure", step, errors })

View File

@@ -99,22 +99,6 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
})
}
/*private stringifyWithSymbol(key, value) {
if (key === "__type" && typeof value === "symbol") {
return Symbol.keyFor(value)
}
return value
}
private jsonWithSymbol(key, value) {
if (key === "__type" && typeof value === "string") {
return Symbol.for(value)
}
return value
}*/
async get(key: string): Promise<TransactionCheckpoint | undefined> {
const data = await this.redisClient.get(key)
@@ -276,7 +260,7 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
const key = [type, transaction.modelId, transaction.transactionId]
if (step) {
key.push(step.id)
key.push(step.id, step.attempts + "")
}
return key.join(":")