fix(orchestration): fix set step failure (#10031)

What:
 - copy data before saving checkpoint
 - removed unused data format function
 - properly handle registerStepFailure to not throw
 - emit onFinish event even when execution failed
This commit is contained in:
Carlos R. L. Rodrigues
2024-11-12 07:06:36 -03:00
committed by GitHub
parent 7794faf49e
commit 1eef324af3
10 changed files with 217 additions and 212 deletions

View File

@@ -0,0 +1,8 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---
Fix set step failure for async steps

View File

@@ -215,9 +215,11 @@ class DistributedTransaction extends EventEmitter {
this.modelId,
this.transactionId
)
await DistributedTransaction.keyValueStore.save(key, data, ttl, options)
return data
const rawData = JSON.parse(JSON.stringify(data))
await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options)
return rawData
}
public static async loadTransaction(

View File

@@ -1,144 +0,0 @@
import { createMedusaContainer } from "@medusajs/utils"
import { MedusaWorkflow } from "../../medusa-workflow"
import { exportWorkflow } from "../workflow-export"
jest.mock("@medusajs/orchestration", () => {
return {
TransactionHandlerType: {
INVOKE: "invoke",
COMPENSATE: "compensate",
},
TransactionState: {
FAILED: "failed",
REVERTED: "reverted",
},
LocalWorkflow: jest.fn(() => {
return {
run: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
registerStepSuccess: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
registerStepFailure: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "done"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
cancel: jest.fn(() => {
return {
getErrors: jest.fn(),
getState: jest.fn(() => "reverted"),
getContext: jest.fn(() => {
return {
invoke: { result_step: "invoke_test" },
}
}),
}
}),
}
}),
}
})
describe("Export Workflow", function () {
afterEach(() => {
MedusaWorkflow.workflows = {}
})
it("should prepare the input data before initializing the transaction", async function () {
let transformedInput
const prepare = jest.fn().mockImplementation(async (data) => {
data.__transformed = true
transformedInput = data
return data
})
const work = exportWorkflow("id" as any, "result_step", prepare)
const container = createMedusaContainer()
const wfHandler = work(container)
const input = {
test: "payload",
}
const { result } = await wfHandler.run({
input,
})
expect(input).toEqual({
test: "payload",
})
expect(transformedInput).toEqual({
test: "payload",
__transformed: true,
})
expect(result).toEqual("invoke_test")
})
describe("Using the exported workflow run", function () {
afterEach(() => {
MedusaWorkflow.workflows = {}
})
it("should prepare the input data before initializing the transaction", async function () {
let transformedInput
const prepare = jest.fn().mockImplementation(async (data) => {
data.__transformed = true
transformedInput = data
return data
})
const work = exportWorkflow("id" as any, "result_step", prepare)
const input = {
test: "payload",
}
const container = createMedusaContainer()
const { result } = await work.run({
input,
container,
})
expect(input).toEqual({
test: "payload",
})
expect(transformedInput).toEqual({
test: "payload",
__transformed: true,
})
expect(result).toEqual("invoke_test")
})
})
})

View File

@@ -3,7 +3,6 @@ import {
DistributedTransactionEvents,
DistributedTransactionType,
LocalWorkflow,
TransactionHandlerType,
TransactionState,
} from "@medusajs/orchestration"
import {
@@ -18,6 +17,7 @@ import {
isPresent,
MedusaContextType,
Modules,
TransactionHandlerType,
} from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
@@ -41,13 +41,11 @@ function createContextualWorkflowRunner<
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
}: {
workflowId: string
defaultResult?: string | Symbol
dataPreparation?: (data: TData) => Promise<unknown>
options?: {
wrappedInput?: boolean
sourcePath?: string
@@ -110,7 +108,10 @@ function createContextualWorkflowRunner<
events,
flowMetadata,
]
const transaction = await method.apply(method, args) as DistributedTransactionType
const transaction = (await method.apply(
method,
args
)) as DistributedTransactionType
let errors = transaction.getErrors(TransactionHandlerType.INVOKE)
@@ -118,16 +119,24 @@ function createContextualWorkflowRunner<
const isCancelled =
isCancel && transaction.getState() === TransactionState.REVERTED
const isRegisterStepFailure =
method === originalRegisterStepFailure &&
transaction.getState() === TransactionState.REVERTED
let thrownError = null
if (
!isCancelled &&
failedStatus.includes(transaction.getState()) &&
throwOnError
!isCancelled &&
!isRegisterStepFailure
) {
/*const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)*/
const firstError = errors?.[0]?.error ?? new Error("Unknown error")
throw firstError
thrownError = firstError
if (throwOnError) {
throw firstError
}
}
let result
@@ -135,6 +144,8 @@ function createContextualWorkflowRunner<
result = resolveValue(resultFrom, transaction.getContext())
if (result instanceof Promise) {
result = await result.catch((e) => {
thrownError = e
if (throwOnError) {
throw e
}
@@ -151,6 +162,7 @@ function createContextualWorkflowRunner<
errors,
transaction,
result,
thrownError,
}
}
@@ -175,22 +187,6 @@ function createContextualWorkflowRunner<
context.transactionId ??= ulid()
context.eventGroupId ??= ulid()
if (typeof dataPreparation === "function") {
try {
const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}
return await originalExecution(
originalRun,
{
@@ -339,7 +335,6 @@ function createContextualWorkflowRunner<
export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: string,
defaultResult?: string | Symbol,
dataPreparation?: (data: TData) => Promise<unknown>,
options?: {
wrappedInput?: boolean
sourcePath?: string
@@ -364,7 +359,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
@@ -390,7 +384,6 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})

View File

@@ -152,15 +152,10 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
WorkflowManager.register(name, context.flow, handlers, options)
}
const workflow = exportWorkflow<TData, TResult>(
name,
returnedStep,
undefined,
{
wrappedInput: true,
sourcePath: fileSourcePath,
}
)
const workflow = exportWorkflow<TData, TResult>(name, returnedStep, {
wrappedInput: true,
sourcePath: fileSourcePath,
})
const mainFlow = <TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer

View File

@@ -3,11 +3,14 @@ import {
createWorkflow,
StepResponse,
} from "@medusajs/framework/workflows-sdk"
import { setTimeout } from "timers/promises"
const step_1 = createStep(
"step_1",
jest.fn((input) => {
jest.fn(async (input) => {
input.test = "test"
await setTimeout(200)
return new StepResponse(input, { compensate: 123 })
}),
jest.fn((compensateInput) => {
@@ -27,9 +30,7 @@ createWorkflow(
timeout: 0.1, // 0.1 second
},
function (input) {
const resp = step_1(input).config({
async: true,
})
const resp = step_1(input)
return resp
}

View File

@@ -131,17 +131,20 @@ export class WorkflowOrchestratorService {
options?: WorkflowOrchestratorRunOptions<T>,
@MedusaContext() sharedContext: Context = {}
) {
let {
const {
input,
context,
transactionId,
resultFrom,
throwOnError,
logOnError,
events: eventHandlers,
container,
} = options ?? {}
let { throwOnError, context } = options ?? {}
throwOnError ??= true
context ??= {}
context.transactionId ??= transactionId ?? ulid()
const workflowId = isString(workflowIdOrWorkflow)
? workflowIdOrWorkflow
: workflowIdOrWorkflow.getName()
@@ -153,9 +156,6 @@ export class WorkflowOrchestratorService {
)
}
context ??= {}
context.transactionId ??= transactionId ?? ulid()
const events: FlowRunOptions["events"] = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
workflowId,
@@ -172,7 +172,7 @@ export class WorkflowOrchestratorService {
const ret = await exportedWorkflow.run({
input,
throwOnError,
throwOnError: false,
logOnError,
resultFrom,
context,
@@ -210,6 +210,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}
if (throwOnError && ret.thrownError) {
throw ret.thrownError
}
return { acknowledgement, ...ret }
}
@@ -262,13 +266,15 @@ export class WorkflowOrchestratorService {
) {
const {
context,
throwOnError,
logOnError,
resultFrom,
container,
events: eventHandlers,
} = options ?? {}
let { throwOnError } = options ?? {}
throwOnError ??= true
const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)
@@ -287,7 +293,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
@@ -308,6 +314,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}
if (throwOnError && ret.thrownError) {
throw ret.thrownError
}
return ret
}
@@ -326,13 +336,15 @@ export class WorkflowOrchestratorService {
) {
const {
context,
throwOnError,
logOnError,
resultFrom,
container,
events: eventHandlers,
} = options ?? {}
let { throwOnError } = options ?? {}
throwOnError ??= true
const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)
@@ -351,7 +363,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
@@ -372,6 +384,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}
if (throwOnError && ret.thrownError) {
throw ret.thrownError
}
return ret
}

View File

@@ -2,6 +2,7 @@ import {
createStep,
createWorkflow,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
const step_1 = createStep(
@@ -49,6 +50,14 @@ const step_3 = createStep(
})
)
const broken_step_2 = createStep(
"broken_step_2",
jest.fn(() => {}),
jest.fn((_, context) => {
throw new Error("Broken compensation step")
})
)
createWorkflow(
{
name: "workflow_2",
@@ -67,3 +76,19 @@ createWorkflow(
return step_3(ret2)
}
)
createWorkflow(
{
name: "workflow_2_revert_fail",
retentionTime: 1000,
},
function (input) {
step_1(input)
broken_step_2().config({
async: true,
})
return new WorkflowResponse("done")
}
)

View File

@@ -15,13 +15,13 @@ import {
TransactionHandlerType,
TransactionStepState,
} from "@medusajs/framework/utils"
import { asValue } from "awilix"
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { asValue } from "awilix"
import { setTimeout } from "timers/promises"
import { WorkflowsModuleService } from "../../src/services"
import "../__fixtures__"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import { TestDatabase } from "../utils"
import { WorkflowsModuleService } from "../../src/services"
jest.setTimeout(999900000)
@@ -167,6 +167,98 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(executionsList).toHaveLength(1)
})
it("should return a list of failed workflow executions and keep it saved when there is a retentionTime set", async () => {
await workflowOrcModule.run("workflow_2", {
input: {
value: "123",
},
transactionId: "transaction_1",
})
let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(1)
await workflowOrcModule.setStepFailure({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "new_step_name",
workflowId: "workflow_2",
transactionId: "transaction_1",
},
stepResponse: { uhuuuu: "yeaah!" },
})
executionsList = await query({
workflow_executions: {
fields: ["id", "state"],
},
})
expect(executionsList).toHaveLength(1)
expect(executionsList[0].state).toEqual("reverted")
})
it("should throw if setStepFailure fails", async () => {
const { acknowledgement } = await workflowOrcModule.run(
"workflow_2_revert_fail",
{
input: {
value: "123",
},
}
)
let done = false
void workflowOrcModule.subscribe({
workflowId: "workflow_2_revert_fail",
transactionId: acknowledgement.transactionId,
subscriber: (event) => {
if (event.eventType === "onFinish") {
done = true
}
},
})
let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(1)
const setStepError = await workflowOrcModule
.setStepFailure({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "broken_step_2",
workflowId: "workflow_2_revert_fail",
transactionId: acknowledgement.transactionId,
},
stepResponse: { uhuuuu: "yeaah!" },
})
.catch((e) => {
return e
})
expect(setStepError).toEqual({ uhuuuu: "yeaah!" })
executionsList = await query({
workflow_executions: {
fields: ["id", "state", "context"],
},
})
expect(executionsList).toHaveLength(1)
expect(executionsList[0].state).toEqual("failed")
expect(done).toBe(true)
})
it("should revert the entire transaction when a step timeout expires", async () => {
const { transaction, result, errors } = await workflowOrcModule.run(
"workflow_step_timeout",

View File

@@ -175,17 +175,21 @@ export class WorkflowOrchestratorService {
options?: WorkflowOrchestratorRunOptions<T>,
@MedusaContext() sharedContext: Context = {}
) {
let {
const {
input,
context,
transactionId,
resultFrom,
throwOnError,
logOnError,
events: eventHandlers,
container,
} = options ?? {}
let { throwOnError, context } = options ?? {}
throwOnError ??= true
context ??= {}
context.transactionId ??= transactionId ?? ulid()
const workflowId = isString(workflowIdOrWorkflow)
? workflowIdOrWorkflow
: workflowIdOrWorkflow.getName()
@@ -194,9 +198,6 @@ export class WorkflowOrchestratorService {
throw new Error("Workflow ID is required")
}
context ??= {}
context.transactionId ??= transactionId ?? ulid()
const events: FlowRunOptions["events"] = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
workflowId,
@@ -210,7 +211,7 @@ export class WorkflowOrchestratorService {
const ret = await exportedWorkflow.run({
input,
throwOnError,
throwOnError: false,
logOnError,
resultFrom,
context,
@@ -248,6 +249,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}
if (throwOnError && ret.thrownError) {
throw ret.thrownError
}
return { acknowledgement, ...ret }
}
@@ -299,13 +304,15 @@ export class WorkflowOrchestratorService {
) {
const {
context,
throwOnError,
logOnError,
resultFrom,
container,
events: eventHandlers,
} = options ?? {}
let { throwOnError } = options ?? {}
throwOnError ??= true
const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)
@@ -324,7 +331,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
@@ -345,6 +352,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}
if (throwOnError && ret.thrownError) {
throw ret.thrownError
}
return ret
}
@@ -363,13 +374,15 @@ export class WorkflowOrchestratorService {
) {
const {
context,
throwOnError,
logOnError,
resultFrom,
container,
events: eventHandlers,
} = options ?? {}
let { throwOnError } = options ?? {}
throwOnError ??= true
const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)
@@ -388,7 +401,7 @@ export class WorkflowOrchestratorService {
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
throwOnError: false,
logOnError,
events,
response: stepResponse,
@@ -409,6 +422,10 @@ export class WorkflowOrchestratorService {
await this.triggerParentStep(ret.transaction, result)
}
if (throwOnError && ret.thrownError) {
throw ret.thrownError
}
return ret
}