From 9cc787cac4bf1c5d8edf1c4b548bb3205100e822 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Tue, 19 Dec 2023 10:38:27 +0100 Subject: [PATCH] feat(workflows-sdk): Configurable retries upon step creation (#5728) **What** - Allow to create step that can be configured to have a max retry - Step end retry mechanism on permanent failure Also added an API to override a step configuration from within the createWorkflow ```ts const step = createStep({ name: "step", maxRetries: 3 }, async (_, context) => { return new StepResponse({ output: "output" }) }) const workflow = createWorkflow("workflow", function () { const res = step().config({ maxRetries: 5 }) // This will override the original maxRetries of 3 }) ``` **NOTE** We can maybe find another name than config on the step workflow data to override the step config. --- .changeset/eleven-jokes-tan.md | 7 ++ .../workflows/utils/composer/compose.ts | 101 ++++++++++++++++++ .../orchestration/src/transaction/errors.ts | 15 +++ .../orchestration/src/transaction/index.ts | 1 + .../transaction/transaction-orchestrator.ts | 41 ++++--- packages/utils/src/bundles.ts | 1 + packages/utils/src/index.ts | 1 + packages/utils/src/orchestration/index.ts | 1 + .../src/orchestration}/symbol.ts | 0 .../src/helper/workflow-export.ts | 11 +- .../src/utils/composer/create-step.ts | 54 ++++++---- .../src/utils/composer/create-workflow.ts | 20 ++-- .../src/utils/composer/helpers/index.ts | 3 +- .../src/utils/composer/helpers/proxy.ts | 6 +- .../utils/composer/helpers/resolve-value.ts | 25 ++--- .../utils/composer/helpers/step-response.ts | 14 ++- .../workflows-sdk/src/utils/composer/hook.ts | 13 ++- .../workflows-sdk/src/utils/composer/index.ts | 1 - .../src/utils/composer/parallelize.ts | 8 +- .../src/utils/composer/transform.ts | 5 +- .../workflows-sdk/src/utils/composer/type.ts | 19 ++-- 21 files changed, 255 insertions(+), 92 deletions(-) create mode 100644 .changeset/eleven-jokes-tan.md create mode 100644 packages/orchestration/src/transaction/errors.ts create mode 100644 packages/utils/src/orchestration/index.ts rename packages/{workflows-sdk/src/utils/composer/helpers => utils/src/orchestration}/symbol.ts (100%) diff --git a/.changeset/eleven-jokes-tan.md b/.changeset/eleven-jokes-tan.md new file mode 100644 index 0000000000..cfeac59b34 --- /dev/null +++ b/.changeset/eleven-jokes-tan.md @@ -0,0 +1,7 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/utils": patch +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-sdk): Configurable retries upon step creation diff --git a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts index b913a6f42a..27fb772003 100644 --- a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts +++ b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts @@ -16,6 +16,40 @@ describe("Workflow composer", function () { jest.clearAllMocks() }) + it("should compose a new workflow composed retryable steps", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + const attempt = context.metadata.attempt || 0 + if (attempt <= maxRetries) { + throw new Error("test error") + } + + return { inputs: [input], obj: "return from 1" } + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput) + + expect(workflowResult).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + }) + it("should compose a new workflow and execute it", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { return { inputs: [input], obj: "return from 1" } @@ -928,6 +962,73 @@ describe("Workflow composer", function () { jest.clearAllMocks() }) + it("should compose a new workflow composed of retryable steps", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + const attempt = context.metadata.attempt || 0 + if (attempt <= maxRetries) { + throw new Error("test error") + } + + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput) + + expect(workflowResult).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + }) + + it("should compose a new workflow composed of retryable steps that should stop retries on permanent failure", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return StepResponse.permanentFailure("fail permanently") + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { errors } = await workflow().run({ + input: workflowInput, + throwOnError: false, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(errors).toHaveLength(1) + expect(errors[0]).toEqual({ + action: "step1", + handlerType: "invoke", + error: expect.objectContaining({ + message: "fail permanently", + }), + }) + }) + it("should compose a new workflow and execute it", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { return new StepResponse({ inputs: [input], obj: "return from 1" }) diff --git a/packages/orchestration/src/transaction/errors.ts b/packages/orchestration/src/transaction/errors.ts new file mode 100644 index 0000000000..95a093c5b3 --- /dev/null +++ b/packages/orchestration/src/transaction/errors.ts @@ -0,0 +1,15 @@ +export class PermanentStepFailureError extends Error { + static isPermanentStepFailureError( + error: Error + ): error is PermanentStepFailureError { + return ( + error instanceof PermanentStepFailureError || + error.name === "PermanentStepFailure" + ) + } + + constructor(message?: string) { + super(message) + this.name = "PermanentStepFailure" + } +} diff --git a/packages/orchestration/src/transaction/index.ts b/packages/orchestration/src/transaction/index.ts index 4380d8bfa6..6633e3b3c3 100644 --- a/packages/orchestration/src/transaction/index.ts +++ b/packages/orchestration/src/transaction/index.ts @@ -3,3 +3,4 @@ export * from "./transaction-orchestrator" export * from "./transaction-step" export * from "./distributed-transaction" export * from "./orchestrator-builder" +export * from "./errors" diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 6dbc1ef68d..fd6b74cd23 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -13,6 +13,7 @@ import { import { EventEmitter } from "events" import { promiseAll } from "@medusajs/utils" +import { PermanentStepFailureError } from "./errors" export type TransactionFlow = { modelId: string @@ -367,11 +368,23 @@ export class TransactionOrchestrator extends EventEmitter { transaction.getContext() ) + const setStepFailure = async ( + error: Error | any, + { endRetry }: { endRetry?: boolean } = {} + ) => { + return TransactionOrchestrator.setStepFailure( + transaction, + step, + error, + endRetry ? 0 : step.definition.maxRetries + ) + } + if (!step.definition.async) { execution.push( transaction .handler(step.definition.action + "", type, payload, transaction) - .then(async (response) => { + .then(async (response: any) => { await TransactionOrchestrator.setStepSuccess( transaction, step, @@ -379,12 +392,13 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { - await TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - step.definition.maxRetries - ) + if ( + PermanentStepFailureError.isPermanentStepFailureError(error) + ) { + await setStepFailure(error, { endRetry: true }) + return + } + await setStepFailure(error) }) ) } else { @@ -393,12 +407,13 @@ export class TransactionOrchestrator extends EventEmitter { transaction .handler(step.definition.action + "", type, payload, transaction) .catch(async (error) => { - await TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - step.definition.maxRetries - ) + if ( + PermanentStepFailureError.isPermanentStepFailureError(error) + ) { + await setStepFailure(error, { endRetry: true }) + return + } + await setStepFailure(error) }) ) ) diff --git a/packages/utils/src/bundles.ts b/packages/utils/src/bundles.ts index 231a15d132..96d38b19bf 100644 --- a/packages/utils/src/bundles.ts +++ b/packages/utils/src/bundles.ts @@ -6,3 +6,4 @@ export * as ModulesSdkUtils from "./modules-sdk" export * as ProductUtils from "./product" export * as SearchUtils from "./search" export * as ShippingProfileUtils from "./shipping" +export * as OrchestrationUtils from "./orchestration" diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index e0e0870d5d..dd2be395ca 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -9,3 +9,4 @@ export * from "./pricing" export * from "./product" export * from "./search" export * from "./shipping" +export * from "./orchestration" diff --git a/packages/utils/src/orchestration/index.ts b/packages/utils/src/orchestration/index.ts new file mode 100644 index 0000000000..e6355e4311 --- /dev/null +++ b/packages/utils/src/orchestration/index.ts @@ -0,0 +1 @@ +export * from "./symbol" diff --git a/packages/workflows-sdk/src/utils/composer/helpers/symbol.ts b/packages/utils/src/orchestration/symbol.ts similarity index 100% rename from packages/workflows-sdk/src/utils/composer/helpers/symbol.ts rename to packages/utils/src/orchestration/symbol.ts diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index a0614c6a4a..7b940609f2 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -10,7 +10,7 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { MedusaModule } from "@medusajs/modules-sdk" import { EOL } from "os" import { ulid } from "ulid" -import { SymbolWorkflowWorkflowData } from "../utils/composer" +import { OrchestrationUtils } from "@medusajs/utils" export type FlowRunOptions = { input?: TData @@ -99,11 +99,16 @@ export const exportWorkflow = ( if (Array.isArray(resultFrom)) { result = resultFrom.map((from) => { const res = transaction.getContext().invoke?.[from] - return res?.__type === SymbolWorkflowWorkflowData ? res.output : res + return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res }) } else { const res = transaction.getContext().invoke?.[resultFrom] - result = res?.__type === SymbolWorkflowWorkflowData ? res.output : res + result = + res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res } } diff --git a/packages/workflows-sdk/src/utils/composer/create-step.ts b/packages/workflows-sdk/src/utils/composer/create-step.ts index a63feb8e8c..78e47f6933 100644 --- a/packages/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/workflows-sdk/src/utils/composer/create-step.ts @@ -1,12 +1,4 @@ -import { - resolveValue, - StepResponse, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowStep, - SymbolWorkflowStepBind, - SymbolWorkflowStepResponse, - SymbolWorkflowWorkflowData, -} from "./helpers" +import { resolveValue, StepResponse } from "./helpers" import { CreateWorkflowComposerContext, StepExecutionContext, @@ -15,6 +7,8 @@ import { WorkflowData, } from "./type" import { proxify } from "./helpers/proxy" +import { TransactionStepsDefinition } from "@medusajs/orchestration" +import { isString, OrchestrationUtils } from "@medusajs/utils" /** * The type of invocation function passed to a step. @@ -75,6 +69,7 @@ interface ApplyStepOptions< TInvokeResultCompensateInput > { stepName: string + stepConfig?: TransactionStepsDefinition input: TStepInputs invokeFn: InvokeFn< TInvokeInput, @@ -91,6 +86,7 @@ interface ApplyStepOptions< * This is where the inputs and context are passed to the underlying invoke and compensate function. * * @param stepName + * @param stepConfig * @param input * @param invokeFn * @param compensateFn @@ -104,6 +100,7 @@ function applyStep< TInvokeResultCompensateInput >({ stepName, + stepConfig = {}, input, invokeFn, compensateFn, @@ -135,12 +132,12 @@ function applyStep< ) const stepResponseJSON = - stepResponse?.__type === SymbolWorkflowStepResponse + stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse ? stepResponse.toJSON() : stepResponse return { - __type: SymbolWorkflowWorkflowData, + __type: OrchestrationUtils.SymbolWorkflowWorkflowData, output: stepResponseJSON, } }, @@ -154,7 +151,8 @@ function applyStep< const stepOutput = transactionContext.invoke[stepName]?.output const invokeResult = - stepOutput?.__type === SymbolWorkflowStepResponse + stepOutput?.__type === + OrchestrationUtils.SymbolWorkflowStepResponse ? stepOutput.compensateInput && JSON.parse(JSON.stringify(stepOutput.compensateInput)) : stepOutput && JSON.parse(JSON.stringify(stepOutput)) @@ -168,14 +166,21 @@ function applyStep< : undefined, } - this.flow.addAction(stepName, { - noCompensation: !compensateFn, - }) + stepConfig!.noCompensation = !compensateFn + + this.flow.addAction(stepName, stepConfig) this.handlers.set(stepName, handler) const ret = { - __type: SymbolWorkflowStep, + __type: OrchestrationUtils.SymbolWorkflowStep, __step__: stepName, + config: (config: Pick) => { + this.flow.replaceAction(stepName, stepName, { + ...stepConfig, + ...config, + }) + return proxify(ret) + }, } return proxify(ret) @@ -236,9 +241,11 @@ export function createStep< TInvokeResultCompensateInput >( /** - * The name of the step. + * The name of the step or its configuration (currently support maxRetries). */ - name: string, + nameOrConfig: + | string + | ({ name: string } & Pick), /** * An invocation function that will be executed when the workflow is executed. The function must return an instance of {@link StepResponse}. The constructor of {@link StepResponse} * accepts the output of the step as a first argument, and optionally as a second argument the data to be passed to the compensation function as a parameter. @@ -256,12 +263,14 @@ export function createStep< */ compensateFn?: CompensateFn ): StepFunction { - const stepName = name ?? invokeFn.name + const stepName = + (isString(nameOrConfig) ? nameOrConfig : nameOrConfig.name) ?? invokeFn.name + const config = isString(nameOrConfig) ? {} : nameOrConfig const returnFn = function (input: { [K in keyof TInvokeInput]: WorkflowData }): WorkflowData { - if (!global[SymbolMedusaWorkflowComposerContext]) { + if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) { throw new Error( "createStep must be used inside a createWorkflow definition" ) @@ -269,7 +278,7 @@ export function createStep< const stepBinder = ( global[ - SymbolMedusaWorkflowComposerContext + OrchestrationUtils.SymbolMedusaWorkflowComposerContext ] as CreateWorkflowComposerContext ).stepBinder @@ -281,6 +290,7 @@ export function createStep< TInvokeResultCompensateInput >({ stepName, + stepConfig: config, input, invokeFn, compensateFn, @@ -288,7 +298,7 @@ export function createStep< ) } - returnFn.__type = SymbolWorkflowStepBind + returnFn.__type = OrchestrationUtils.SymbolWorkflowStepBind returnFn.__step__ = stepName return returnFn as unknown as StepFunction diff --git a/packages/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/workflows-sdk/src/utils/composer/create-workflow.ts index d44a47f629..546515a65e 100644 --- a/packages/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/workflows-sdk/src/utils/composer/create-workflow.ts @@ -4,13 +4,9 @@ import { WorkflowManager, } from "@medusajs/orchestration" import { LoadedModule, MedusaContainer } from "@medusajs/types" -import { FlowRunOptions, WorkflowResult, exportWorkflow } from "../../helper" -import { - SymbolInputReference, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowStep, - resolveValue, -} from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" +import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper" +import { resolveValue } from "./helpers" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -18,7 +14,7 @@ import { WorkflowDataProperties, } from "./type" -global[SymbolMedusaWorkflowComposerContext] = null +global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null /** * An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create @@ -189,16 +185,16 @@ export function createWorkflow< }, } - global[SymbolMedusaWorkflowComposerContext] = context + global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = context const inputPlaceHolder = proxify({ - __type: SymbolInputReference, + __type: OrchestrationUtils.SymbolInputReference, __step__: "", }) const returnedStep = composer.apply(context, [inputPlaceHolder]) - delete global[SymbolMedusaWorkflowComposerContext] + delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] WorkflowManager.update(name, context.flow, handlers) @@ -221,7 +217,7 @@ export function createWorkflow< > => { args ??= {} args.resultFrom ??= - returnedStep?.__type === SymbolWorkflowStep + returnedStep?.__type === OrchestrationUtils.SymbolWorkflowStep ? returnedStep.__step__ : undefined diff --git a/packages/workflows-sdk/src/utils/composer/helpers/index.ts b/packages/workflows-sdk/src/utils/composer/helpers/index.ts index 2636ee016f..4f369c7a5e 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/index.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/index.ts @@ -1,3 +1,2 @@ export * from "./step-response" -export * from "./symbol" -export * from "./resolve-value" \ No newline at end of file +export * from "./resolve-value" diff --git a/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts b/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts index 68ca47b2f3..91971fa9eb 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts @@ -1,6 +1,6 @@ import { transform } from "../transform" import { WorkflowData, WorkflowTransactionContext } from "../type" -import { SymbolInputReference, SymbolWorkflowStepTransformer } from "./symbol" +import { OrchestrationUtils } from "@medusajs/utils" import { resolveValue } from "./resolve-value" export function proxify(obj: WorkflowData): T { @@ -13,8 +13,8 @@ export function proxify(obj: WorkflowData): T { return transform(target[prop], async function (input, context) { const { invoke } = context as WorkflowTransactionContext let output = - target.__type === SymbolInputReference || - target.__type === SymbolWorkflowStepTransformer + target.__type === OrchestrationUtils.SymbolInputReference || + target.__type === OrchestrationUtils.SymbolWorkflowStepTransformer ? target : invoke?.[obj.__step__]?.output diff --git a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts index c5fb131771..8fb49aa423 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts @@ -1,29 +1,26 @@ -import { promiseAll } from "@medusajs/utils" -import { - SymbolInputReference, - SymbolWorkflowHook, - SymbolWorkflowStep, - SymbolWorkflowStepResponse, - SymbolWorkflowStepTransformer, -} from "./symbol" +import { OrchestrationUtils, promiseAll } from "@medusajs/utils" async function resolveProperty(property, transactionContext) { const { invoke: invokeRes } = transactionContext - if (property?.__type === SymbolInputReference) { + if (property?.__type === OrchestrationUtils.SymbolInputReference) { return transactionContext.payload - } else if (property?.__type === SymbolWorkflowStepTransformer) { + } else if ( + property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer + ) { return await property.__resolver(transactionContext) - } else if (property?.__type === SymbolWorkflowHook) { + } else if (property?.__type === OrchestrationUtils.SymbolWorkflowHook) { return await property.__value(transactionContext) - } else if (property?.__type === SymbolWorkflowStep) { + } else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) { const output = invokeRes[property.__step__]?.output - if (output?.__type === SymbolWorkflowStepResponse) { + if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) { return output.output } return output - } else if (property?.__type === SymbolWorkflowStepResponse) { + } else if ( + property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse + ) { return property.output } else { return property diff --git a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts index a0ccaff81e..11ce065e66 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts @@ -1,4 +1,5 @@ -import { SymbolWorkflowStepResponse } from "./symbol" +import { OrchestrationUtils } from "@medusajs/utils" +import { PermanentStepFailureError } from "@medusajs/orchestration" /** * This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`. @@ -9,7 +10,7 @@ import { SymbolWorkflowStepResponse } from "./symbol" * as that of `TOutput`. */ export class StepResponse { - readonly #__type = SymbolWorkflowStepResponse + readonly #__type = OrchestrationUtils.SymbolWorkflowStepResponse readonly #output: TOutput readonly #compensateInput?: TCompensateInput @@ -35,6 +36,15 @@ export class StepResponse { this.#compensateInput = (compensateInput ?? output) as TCompensateInput } + /** + * Creates a StepResponse that indicates that the step has failed and the retry mechanism should not kick in anymore. + * + * @param message - An optional message to be logged. Default to `Permanent failure`. + */ + static permanentFailure(message = "Permanent failure"): never { + throw new PermanentStepFailureError(message) + } + /** * @internal */ diff --git a/packages/workflows-sdk/src/utils/composer/hook.ts b/packages/workflows-sdk/src/utils/composer/hook.ts index 8902d2011a..1dd8324455 100644 --- a/packages/workflows-sdk/src/utils/composer/hook.ts +++ b/packages/workflows-sdk/src/utils/composer/hook.ts @@ -1,8 +1,5 @@ -import { - resolveValue, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowHook, -} from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" +import { resolveValue } from "./helpers" import { CreateWorkflowComposerContext, StepExecutionContext, @@ -104,7 +101,9 @@ export function hook( value: any ): WorkflowData { const hookBinder = ( - global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + global[ + OrchestrationUtils.SymbolMedusaWorkflowComposerContext + ] as CreateWorkflowComposerContext ).hookBinder return hookBinder(name, function (context) { @@ -130,7 +129,7 @@ export function hook( } return finalResult }, - __type: SymbolWorkflowHook, + __type: OrchestrationUtils.SymbolWorkflowHook, } }) } diff --git a/packages/workflows-sdk/src/utils/composer/index.ts b/packages/workflows-sdk/src/utils/composer/index.ts index 485d8451cc..d654c7e7f0 100644 --- a/packages/workflows-sdk/src/utils/composer/index.ts +++ b/packages/workflows-sdk/src/utils/composer/index.ts @@ -3,7 +3,6 @@ export * from "./create-workflow" export * from "./hook" export * from "./parallelize" export * from "./helpers/resolve-value" -export * from "./helpers/symbol" export * from "./helpers/step-response" export * from "./transform" export * from "./type" diff --git a/packages/workflows-sdk/src/utils/composer/parallelize.ts b/packages/workflows-sdk/src/utils/composer/parallelize.ts index 500b3c5da1..f9ab8df149 100644 --- a/packages/workflows-sdk/src/utils/composer/parallelize.ts +++ b/packages/workflows-sdk/src/utils/composer/parallelize.ts @@ -1,5 +1,5 @@ import { CreateWorkflowComposerContext, WorkflowData } from "./type" -import { SymbolMedusaWorkflowComposerContext } from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" /** * This function is used to run multiple steps in parallel. The result of each step will be returned as part of the result array. @@ -43,14 +43,16 @@ import { SymbolMedusaWorkflowComposerContext } from "./helpers" export function parallelize( ...steps: TResult ): TResult { - if (!global[SymbolMedusaWorkflowComposerContext]) { + if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) { throw new Error( "parallelize must be used inside a createWorkflow definition" ) } const parallelizeBinder = ( - global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + global[ + OrchestrationUtils.SymbolMedusaWorkflowComposerContext + ] as CreateWorkflowComposerContext ).parallelizeBinder const resultSteps = steps.map((step) => step) diff --git a/packages/workflows-sdk/src/utils/composer/transform.ts b/packages/workflows-sdk/src/utils/composer/transform.ts index 98f5d076cb..0b7020e630 100644 --- a/packages/workflows-sdk/src/utils/composer/transform.ts +++ b/packages/workflows-sdk/src/utils/composer/transform.ts @@ -1,6 +1,7 @@ -import { resolveValue, SymbolWorkflowStepTransformer } from "./helpers" +import { resolveValue } from "./helpers" import { StepExecutionContext, WorkflowData } from "./type" import { proxify } from "./helpers/proxy" +import { OrchestrationUtils } from "@medusajs/utils" type Func1 = ( input: T extends WorkflowData @@ -163,7 +164,7 @@ export function transform( ...functions: Function[] ): unknown { const ret = { - __type: SymbolWorkflowStepTransformer, + __type: OrchestrationUtils.SymbolWorkflowStepTransformer, __resolver: undefined, } diff --git a/packages/workflows-sdk/src/utils/composer/type.ts b/packages/workflows-sdk/src/utils/composer/type.ts index d74edf307e..14c0d8396e 100644 --- a/packages/workflows-sdk/src/utils/composer/type.ts +++ b/packages/workflows-sdk/src/utils/composer/type.ts @@ -2,18 +2,15 @@ import { OrchestratorBuilder, TransactionContext as OriginalWorkflowTransactionContext, TransactionPayload, + TransactionStepsDefinition, WorkflowHandler, } from "@medusajs/orchestration" import { Context, MedusaContainer } from "@medusajs/types" export type StepFunctionResult = - (this: CreateWorkflowComposerContext) => TOutput extends [] - ? [ - ...WorkflowData<{ - [K in keyof TOutput]: TOutput[number][K] - }>[] - ] - : WorkflowData<{ [K in keyof TOutput]: TOutput[K] }> + ( + this: CreateWorkflowComposerContext + ) => WorkflowData<{ [K in keyof TOutput]: TOutput[K] }> /** * A step function to be used in a workflow. @@ -24,7 +21,13 @@ export type StepFunctionResult = export type StepFunction = { (input: { [K in keyof TInput]: WorkflowData }): WorkflowData<{ [K in keyof TOutput]: TOutput[K] - }> + }> & { + config( + config: Pick + ): WorkflowData<{ + [K in keyof TOutput]: TOutput[K] + }> + } } & WorkflowDataProperties<{ [K in keyof TOutput]: TOutput[K] }>