diff --git a/.changeset/brown-spoons-sparkle.md b/.changeset/brown-spoons-sparkle.md new file mode 100644 index 0000000000..d73ee5958b --- /dev/null +++ b/.changeset/brown-spoons-sparkle.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/workflows-sdk": patch +"@medusajs/orchestration": patch +--- + +Fix Workflow Engine subscribers response and error diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index e248b6e68e..ae19b20995 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -631,21 +631,30 @@ export class TransactionOrchestrator extends EventEmitter { transaction, }) + const isAsync = step.isCompensating() + ? step.definition.compensateAsync + : step.definition.async + const setStepFailure = async ( error: Error | any, { endRetry }: { endRetry?: boolean } = {} ) => { - return TransactionOrchestrator.setStepFailure( + const ret = TransactionOrchestrator.setStepFailure( transaction, step, error, endRetry ? 0 : step.definition.maxRetries ) - } - const isAsync = step.isCompensating() - ? step.definition.compensateAsync - : step.definition.async + if (isAsync) { + await transaction.scheduleRetry( + step, + step.definition.retryInterval ?? 0 + ) + } + + return ret + } if (!isAsync) { hasSyncSteps = true diff --git a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 55b2f33f15..87dcd93e7c 100644 --- a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -5,11 +5,12 @@ import { TransactionStep, } from "@medusajs/orchestration" import { ContainerLike, Context, MedusaContainer } from "@medusajs/types" -import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils" +import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" import { - type FlowRunOptions, MedusaWorkflow, ReturnWorkflow, + resolveValue, + type FlowRunOptions, } from "@medusajs/workflows-sdk" import { ulid } from "ulid" import { InMemoryDistributedTransactionStorage } from "../utils" @@ -462,31 +463,40 @@ export class WorkflowOrchestratorService { notify({ eventType: "onStepBegin", step }) }, - onStepSuccess: ({ step, transaction }) => { - const response = transaction.getContext().invoke[step.id] + onStepSuccess: async ({ step, transaction }) => { + const stepName = step.definition.action! + const response = await resolveValue( + transaction.getContext().invoke[stepName], + transaction + ) customEventHandlers?.onStepSuccess?.({ step, transaction, response }) notify({ eventType: "onStepSuccess", step, response }) }, onStepFailure: ({ step, transaction }) => { - const errors = transaction.getErrors(TransactionHandlerType.INVOKE)[ - step.id - ] + const stepName = step.definition.action! + const errors = transaction + .getErrors(TransactionHandlerType.INVOKE) + .filter((err) => err.action === stepName) + customEventHandlers?.onStepFailure?.({ step, transaction, errors }) notify({ eventType: "onStepFailure", step, errors }) }, onCompensateStepSuccess: ({ step, transaction }) => { - const response = transaction.getContext().compensate[step.id] + const stepName = step.definition.action! + const response = transaction.getContext().compensate[stepName] customEventHandlers?.onStepSuccess?.({ step, transaction, response }) notify({ eventType: "onCompensateStepSuccess", step, response }) }, onCompensateStepFailure: ({ step, transaction }) => { - const errors = transaction.getErrors(TransactionHandlerType.COMPENSATE)[ - step.id - ] + const stepName = step.definition.action! + const errors = transaction + .getErrors(TransactionHandlerType.COMPENSATE) + .filter((err) => err.action === stepName) + customEventHandlers?.onStepFailure?.({ step, transaction, errors }) notify({ eventType: "onCompensateStepFailure", step, errors }) diff --git a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts index 77770a5c74..8df4e3be02 100644 --- a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -10,6 +10,7 @@ import { FlowRunOptions, MedusaWorkflow, ReturnWorkflow, + resolveValue, } from "@medusajs/workflows-sdk" import Redis from "ioredis" import { ulid } from "ulid" @@ -512,30 +513,39 @@ export class WorkflowOrchestratorService { await notify({ eventType: "onStepBegin", step }) }, onStepSuccess: async ({ step, transaction }) => { - const response = transaction.getContext().invoke[step.id] + const stepName = step.definition.action! + const response = await resolveValue( + transaction.getContext().invoke[stepName], + transaction + ) customEventHandlers?.onStepSuccess?.({ step, transaction, response }) await notify({ eventType: "onStepSuccess", step, response }) }, onStepFailure: async ({ step, transaction }) => { - const errors = transaction.getErrors(TransactionHandlerType.INVOKE)[ - step.id - ] + const stepName = step.definition.action! + const errors = transaction + .getErrors(TransactionHandlerType.INVOKE) + .filter((err) => err.action === stepName) + customEventHandlers?.onStepFailure?.({ step, transaction, errors }) await notify({ eventType: "onStepFailure", step, errors }) }, onCompensateStepSuccess: async ({ step, transaction }) => { - const response = transaction.getContext().compensate[step.id] + const stepName = step.definition.action! + const response = transaction.getContext().compensate[stepName] customEventHandlers?.onStepSuccess?.({ step, transaction, response }) await notify({ eventType: "onCompensateStepSuccess", step, response }) }, onCompensateStepFailure: async ({ step, transaction }) => { - const errors = transaction.getErrors(TransactionHandlerType.COMPENSATE)[ - step.id - ] + const stepName = step.definition.action! + const errors = transaction + .getErrors(TransactionHandlerType.COMPENSATE) + .filter((err) => err.action === stepName) + customEventHandlers?.onStepFailure?.({ step, transaction, errors }) await notify({ eventType: "onCompensateStepFailure", step, errors }) diff --git a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 38dd4eb295..821703e396 100644 --- a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -99,22 +99,6 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt }) } - /*private stringifyWithSymbol(key, value) { - if (key === "__type" && typeof value === "symbol") { - return Symbol.keyFor(value) - } - - return value - } - - private jsonWithSymbol(key, value) { - if (key === "__type" && typeof value === "string") { - return Symbol.for(value) - } - - return value - }*/ - async get(key: string): Promise { const data = await this.redisClient.get(key) @@ -276,7 +260,7 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt const key = [type, transaction.modelId, transaction.transactionId] if (step) { - key.push(step.id) + key.push(step.id, step.attempts + "") } return key.join(":") diff --git a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts index 0984b2821a..b8189fb8ea 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts @@ -32,8 +32,6 @@ async function resolveProperty(property, transactionContext) { * @internal */ export async function resolveValue(input, transactionContext) { - const copiedInput = deepCopy(input) - const unwrapInput = async ( inputTOUnwrap: Record, parentRef: any @@ -66,6 +64,11 @@ export async function resolveValue(input, transactionContext) { return parentRef } + const copiedInput = + input?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? deepCopy(input.output) + : deepCopy(input) + const result = copiedInput?.__type ? await resolveProperty(copiedInput, transactionContext) : await unwrapInput(copiedInput, {})