fix(): Workflow cancellation + gracefully handle non serializable state (#10674)

FIXES FRMW-2852

**What**
A workflow distributed transaction expect any response and error to be serializable. When it is not the case, the distributed transaction might fail during the save checkpoint that occurs for async steps. This can lead to unexpected behaviour.

With this pr, we introduce a way to handle non serialazable object in a more sustainable manner, this means the following:

- If a workflow throw any non serialazable error (e.g AWS error that contains full IncomingMessage object that related to network communication, think of req/res) then we identify that this object is not serialzable and we clean up the object to make it serializable without loosing the main information, add a new error to the workflow to informed of this issue and can be handled by the user.
- If a response is not serializable (which should not happen at this point because it is handled before by the value resolver), in that case, we wont be able to reuse that response to continue the workflow which means that the workflow is in a non runnable state. In that case we throw a specific error stating that a non serializable context is being provided

**second what**
This pr refactor the `runAsStep` to add better support for workflow cancelation, especially async ones
This commit is contained in:
Adrien de Peretti
2025-01-05 14:30:17 +01:00
committed by GitHub
parent 152a94e1e1
commit 7d8f6cf39f
9 changed files with 343 additions and 16 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---
fix(): Workflow cancellation + gracefully handle non serializable state

View File

@@ -1,6 +1,7 @@
import { TransactionStepState, TransactionStepStatus } from "@medusajs/utils"
import { setTimeout } from "timers/promises"
import {
DistributedTransaction,
DistributedTransactionType,
TransactionHandlerType,
TransactionOrchestrator,
@@ -10,6 +11,7 @@ import {
TransactionStepTimeoutError,
TransactionTimeoutError,
} from "../../transaction"
import { BaseInMemoryDistributedTransactionStorage } from "../../transaction/datastore/base-in-memory-storage"
describe("Transaction Orchestrator", () => {
afterEach(() => {
@@ -151,6 +153,104 @@ describe("Transaction Orchestrator", () => {
expect(actionOrder).toEqual(["one", "two", "three", "four", "five", "six"])
})
it("Should gracefully handle non serializable error when an async step fails", async () => {
class BaseInMemoryDistributedTransactionStorage_ extends BaseInMemoryDistributedTransactionStorage {
scheduleRetry() {
return Promise.resolve()
}
}
DistributedTransaction.setStorage(
new BaseInMemoryDistributedTransactionStorage_()
)
const actionOrder: string[] = []
async function handler(
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload
) {
if (functionHandlerType === TransactionHandlerType.INVOKE) {
actionOrder.push(actionId)
}
if (
functionHandlerType === TransactionHandlerType.INVOKE &&
actionId === "three"
) {
const error = new Error("Step 3 failed")
const obj: any = {}
obj.self = obj
;(error as any).metadata = obj
throw error
}
}
const flow: TransactionStepsDefinition = {
next: [
{
action: "one",
},
{
action: "two",
next: {
action: "four",
next: {
action: "six",
},
},
},
{
action: "three",
async: true,
maxRetries: 0,
next: {
action: "five",
},
},
],
}
const strategy = new TransactionOrchestrator({
id: "transaction-name",
definition: flow,
})
const transaction = await strategy.beginTransaction(
"transaction_id_123",
handler
)
await strategy.resume(transaction)
expect(transaction.getErrors()).toHaveLength(2)
expect(transaction.getErrors()).toEqual([
{
action: "three",
error: {
message: "Step 3 failed",
name: "Error",
stack: expect.any(String),
},
handlerType: "invoke",
},
{
action: "three",
error: expect.objectContaining({
message: expect.stringContaining(
"Converting circular structure to JSON"
),
stack: expect.any(String),
}),
handlerType: "invoke",
},
])
DistributedTransaction.setStorage(
new BaseInMemoryDistributedTransactionStorage()
)
})
it("Should not execute next steps when a step fails", async () => {
const actionOrder: string[] = []
async function handler(

View File

@@ -9,6 +9,7 @@ import {
TransactionHandlerType,
TransactionState,
} from "./types"
import { NonSerializableCheckPointError } from "./errors"
/**
* @typedef TransactionMetadata
@@ -204,19 +205,14 @@ class DistributedTransaction extends EventEmitter {
return
}
const data = new TransactionCheckpoint(
this.getFlow(),
this.getContext(),
this.getErrors()
)
const key = TransactionOrchestrator.getKeyName(
DistributedTransaction.keyPrefix,
this.modelId,
this.transactionId
)
const rawData = JSON.parse(JSON.stringify(data))
const rawData = this.#serializeCheckpointData()
await DistributedTransaction.keyValueStore.save(key, rawData, ttl, options)
return rawData
@@ -320,6 +316,76 @@ class DistributedTransaction extends EventEmitter {
public hasTemporaryData(key: string) {
return this.#temporaryStorage.has(key)
}
/**
* Try to serialize the checkpoint data
* If it fails, it means that the context or the errors are not serializable
* and we should handle it
*
* @internal
* @returns
*/
#serializeCheckpointData() {
const data = new TransactionCheckpoint(
this.getFlow(),
this.getContext(),
this.getErrors()
)
const isSerializable = (obj) => {
try {
JSON.parse(JSON.stringify(obj))
return true
} catch {
return false
}
}
let rawData
try {
rawData = JSON.parse(JSON.stringify(data))
} catch (e) {
if (!isSerializable(this.context)) {
// This is a safe guard, we should never reach this point
// If we do, it means that the context is not serializable
// and we should throw an error
throw new NonSerializableCheckPointError(
"Unable to serialize context object. Please make sure the workflow input and steps response are serializable."
)
}
if (!isSerializable(this.errors)) {
const nonSerializableErrors: TransactionStepError[] = []
for (const error of this.errors) {
if (!isSerializable(error.error)) {
error.error = {
name: error.error.name,
message: error.error.message,
stack: error.error.stack,
}
nonSerializableErrors.push({
...error,
error: e,
})
}
}
if (nonSerializableErrors.length) {
this.errors.push(...nonSerializableErrors)
}
}
const data = new TransactionCheckpoint(
this.getFlow(),
this.getContext(),
this.getErrors()
)
rawData = JSON.parse(JSON.stringify(data))
}
return rawData
}
}
DistributedTransaction.setStorage(

View File

@@ -68,3 +68,19 @@ export class TransactionTimeoutError extends BaseStepErrror {
super("TransactionTimeoutError", message, stepResponse)
}
}
export class NonSerializableCheckPointError extends Error {
static isNonSerializableCheckPointError(
error: Error
): error is NonSerializableCheckPointError {
return (
error instanceof NonSerializableCheckPointError ||
error?.name === "NonSerializableCheckPointError"
)
}
constructor(message?: string) {
super(message)
this.name = "NonSerializableCheckPointError"
}
}

View File

@@ -1,3 +1,4 @@
import { TransactionState } from "@medusajs/utils"
import { createStep } from "../create-step"
import { createWorkflow } from "../create-workflow"
import { StepResponse } from "../helpers"
@@ -42,6 +43,44 @@ describe("Workflow composer", () => {
expect(result).toEqual({ result: "hi from outside" })
})
it("should cancel transaction on failed sub workflow call", async function () {
const step1 = createStep("step1", async (_, context) => {
return new StepResponse("step1")
})
const step2 = createStep("step2", async (input: string, context) => {
return new StepResponse({ result: input })
})
const step3 = createStep("step3", async (input: string, context) => {
throw new Error("I have failed")
})
const subWorkflow = createWorkflow(
getNewWorkflowId(),
function (input: WorkflowData<string>) {
step1()
return new WorkflowResponse(step2(input))
}
)
const workflow = createWorkflow(getNewWorkflowId(), function () {
const subWorkflowRes = subWorkflow.runAsStep({
input: "hi from outside",
})
return new WorkflowResponse(step3(subWorkflowRes.result))
})
const { errors, transaction } = await workflow.run({
input: {},
throwOnError: false,
})
expect(errors).toHaveLength(1)
expect(errors[0].error.message).toEqual("I have failed")
expect(transaction.getState()).toEqual(TransactionState.REVERTED)
})
it("should skip step if condition is false", async function () {
const step1 = createStep("step1", async (_, context) => {
return new StepResponse({ result: "step1" })

View File

@@ -10,7 +10,7 @@ import {
OrchestrationUtils,
} from "@medusajs/utils"
import { ulid } from "ulid"
import { exportWorkflow } from "../../helper"
import { exportWorkflow, WorkflowResult } from "../../helper"
import { createStep } from "./create-step"
import { proxify } from "./helpers/proxy"
import { StepResponse } from "./helpers/step-response"
@@ -201,20 +201,29 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
},
})
const { result, transaction: flowTransaction } = transaction
const { result } = transaction
if (!context.isAsync || flowTransaction.hasFinished()) {
return new StepResponse(result, transaction)
}
return
return new StepResponse(
result,
context.isAsync ? stepContext.transactionId : transaction
)
},
async (transaction, { container }) => {
async (transaction, stepContext) => {
if (!transaction) {
return
}
await workflow(container).cancel(transaction)
const { container, ...sharedContext } = stepContext
await workflow(container).cancel({
transaction: (transaction as WorkflowResult<any>).transaction,
transactionId: isString(transaction) ? transaction : undefined,
container,
context: {
...sharedContext,
parentStepIdempotencyKey: stepContext.idempotencyKey,
},
})
}
)(input) as ReturnType<StepFunction<TData, TResult>>

View File

@@ -4,3 +4,4 @@ export * from "./workflow_async"
export * from "./workflow_step_timeout"
export * from "./workflow_transaction_timeout"
export * from "./workflow_when"
export * from "./workflow_async_compensate"

View File

@@ -0,0 +1,51 @@
import {
createStep,
createWorkflow,
parallelize,
StepResponse,
WorkflowResponse,
} from "@medusajs/framework/workflows-sdk"
const step_1_background = createStep(
{
name: "step_1_background_fail",
async: true,
},
jest.fn(async (input) => {
return new StepResponse(input)
})
)
const nestedWorkflow = createWorkflow(
{
name: "nested_sub_flow_async_fail",
},
function (input) {
const resp = step_1_background(input)
return resp
}
)
const step_2 = createStep(
{
name: "step_2_fail",
},
jest.fn(async () => {
throw new Error("step_2_fail")
})
)
createWorkflow(
{
name: "workflow_async_background_fail",
},
function (input) {
const ret = nestedWorkflow.runAsStep({
input,
})
step_2()
return new WorkflowResponse(ret)
}
)

View File

@@ -1,5 +1,6 @@
import {
DistributedTransactionType,
TransactionStep,
TransactionStepTimeoutError,
TransactionTimeoutError,
WorkflowManager,
@@ -473,6 +474,44 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
failTrap(done)
})
it("should cancel an async sub workflow when compensating", (done) => {
const workflowId = "workflow_async_background_fail"
void workflowOrcModule.run(workflowId, {
input: {
callSubFlow: true,
},
transactionId: "trx_123_compensate_async_sub_workflow",
throwOnError: false,
logOnError: false,
})
let onCompensateStepSuccess: { step: TransactionStep } | null = null
void workflowOrcModule.subscribe({
workflowId,
subscriber: (event) => {
if (event.eventType === "onCompensateStepSuccess") {
onCompensateStepSuccess = event
}
if (event.eventType === "onFinish") {
expect(onCompensateStepSuccess).toBeDefined()
expect(onCompensateStepSuccess!.step.id).toEqual(
"_root.nested_sub_flow_async_fail-as-step" // The workflow as step
)
expect(onCompensateStepSuccess!.step.compensate).toEqual({
state: "reverted",
status: "ok",
})
done()
}
},
})
failTrap(done)
})
})
// Note: These tests depend on actual Redis instance and waiting for the scheduled jobs to run, which isn't great.