feat(workflows-sdk): add response to permanent failure (#10177)

This commit is contained in:
Carlos R. L. Rodrigues
2024-11-20 17:29:37 -03:00
committed by GitHub
parent 42c08fa8e0
commit aeb5b43692
5 changed files with 146 additions and 23 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
---
Add response to permanent failure

View File

@@ -1,4 +1,18 @@
export class PermanentStepFailureError extends Error {
class BaseStepErrror extends Error {
#stepResponse: unknown
constructor(name, message?: string, stepResponse?: unknown) {
super(message)
this.name = name
this.#stepResponse = stepResponse
}
getStepResponse(): unknown {
return this.#stepResponse
}
}
export class PermanentStepFailureError extends BaseStepErrror {
static isPermanentStepFailureError(
error: Error
): error is PermanentStepFailureError {
@@ -8,26 +22,24 @@ export class PermanentStepFailureError extends Error {
)
}
constructor(message?: string) {
super(message)
this.name = "PermanentStepFailure"
constructor(message?: string, stepResponse?: unknown) {
super("PermanentStepFailure", message, stepResponse)
}
}
export class SkipStepResponse extends Error {
export class SkipStepResponse extends BaseStepErrror {
static isSkipStepResponse(error: Error): error is SkipStepResponse {
return (
error instanceof SkipStepResponse || error?.name === "SkipStepResponse"
)
}
constructor(message?: string) {
super(message)
this.name = "SkipStepResponse"
constructor(message?: string, stepResponse?: unknown) {
super("SkipStepResponse", message, stepResponse)
}
}
export class TransactionStepTimeoutError extends Error {
export class TransactionStepTimeoutError extends BaseStepErrror {
static isTransactionStepTimeoutError(
error: Error
): error is TransactionStepTimeoutError {
@@ -37,13 +49,12 @@ export class TransactionStepTimeoutError extends Error {
)
}
constructor(message?: string) {
super(message)
this.name = "TransactionStepTimeoutError"
constructor(message?: string, stepResponse?: unknown) {
super("TransactionStepTimeoutError", message, stepResponse)
}
}
export class TransactionTimeoutError extends Error {
export class TransactionTimeoutError extends BaseStepErrror {
static isTransactionTimeoutError(
error: Error
): error is TransactionTimeoutError {
@@ -53,8 +64,7 @@ export class TransactionTimeoutError extends Error {
)
}
constructor(message?: string) {
super(message)
this.name = "TransactionTimeoutError"
constructor(message?: string, stepResponse?: unknown) {
super("TransactionTimeoutError", message, stepResponse)
}
}

View File

@@ -18,6 +18,7 @@ import {
} from "./types"
import {
isDefined,
isErrorLike,
MedusaError,
promiseAll,
@@ -764,8 +765,24 @@ export class TransactionOrchestrator extends EventEmitter {
const setStepFailure = async (
error: Error | any,
{ endRetry }: { endRetry?: boolean } = {}
{
endRetry,
response,
}: {
endRetry?: boolean
response?: unknown
} = {}
) => {
if (isDefined(response) && step.saveResponse) {
transaction.addResponse(
step.definition.action!,
step.isCompensating()
? TransactionHandlerType.COMPENSATE
: TransactionHandlerType.INVOKE,
response
)
}
await TransactionOrchestrator.setStepFailure(
transaction,
step,
@@ -841,6 +858,8 @@ export class TransactionOrchestrator extends EventEmitter {
)
})
.catch(async (error) => {
const response = error?.getStepResponse?.()
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
await this.checkTransactionTimeout(
@@ -852,11 +871,14 @@ export class TransactionOrchestrator extends EventEmitter {
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
await setStepFailure(error, {
endRetry: true,
response,
})
return
}
await setStepFailure(error)
await setStepFailure(error, { response })
})
)
} else {
@@ -917,14 +939,19 @@ export class TransactionOrchestrator extends EventEmitter {
)
})
.catch(async (error) => {
const response = error?.getStepResponse?.()
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
await setStepFailure(error, {
endRetry: true,
response,
})
return
}
await setStepFailure(error)
await setStepFailure(error, { response })
})
})
)

View File

@@ -2522,4 +2522,78 @@ describe("Workflow composer", function () {
"event-group-id"
)
})
it("should fail step and return response to compensate partial data", async () => {
const maxRetries = 3
const mockStep1Fn = jest.fn().mockImplementation(async (input, context) => {
const ok: number[] = []
const errors: number[] = []
const toInsert = [1, 2, 3, 4, 5, 6, 7, 8]
await promiseAll(
toInsert.map(async (i) => {
// fail on odd numbers
if (i % 2 === 0) {
ok.push(i)
return i
}
errors.push(i)
throw new Error("failed")
})
).catch((e) => {})
if (errors.length > 0) {
return StepResponse.permanentFailure(
"Error inserting " + errors.join(", "),
ok
)
}
return new StepResponse(ok)
})
const mockStep1CompensateFn = jest
.fn()
.mockImplementation((input, context) => {
return input
})
const step1 = createStep(
{ name: "step1", maxRetries },
mockStep1Fn,
mockStep1CompensateFn
)
const step2 = createStep("step2", () => {
throw new Error("failed")
})
const workflow = createWorkflow("workflow1", function (input) {
step1(input)
step2()
})
const workflowInput = { test: "payload1" }
const { errors } = await workflow().run({
input: workflowInput,
throwOnError: false,
})
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
expect(mockStep1Fn.mock.calls[0]).toHaveLength(2)
expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput)
expect(mockStep1CompensateFn.mock.calls[0][0]).toEqual([2, 4, 6, 8])
expect(errors).toHaveLength(1)
expect(errors[0]).toEqual({
action: "step1",
handlerType: "invoke",
error: expect.objectContaining({
message: "Error inserting 1, 3, 5, 7",
}),
})
})
})

View File

@@ -115,8 +115,14 @@ export class StepResponse<TOutput, TCompensateInput = TOutput> {
* console.log(result)
* })
*/
static permanentFailure(message = "Permanent failure"): never {
throw new PermanentStepFailureError(message)
static permanentFailure(
message = "Permanent failure",
compensateInput?: unknown
): never {
const response = isDefined(compensateInput)
? new StepResponse(compensateInput)
: undefined
throw new PermanentStepFailureError(message, response)
}
static skip(): SkipStepResponse {