diff --git a/.changeset/red-emus-carry.md b/.changeset/red-emus-carry.md new file mode 100644 index 0000000000..a70e4ca5bc --- /dev/null +++ b/.changeset/red-emus-carry.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-sdk): run, registerStepSuccess and registerStepFailure shortcut diff --git a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts index f479bd9046..23161e1afa 100644 --- a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts +++ b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts @@ -1,4 +1,5 @@ import { exportWorkflow } from "../workflow-export" +import { createMedusaContainer } from "@medusajs/utils" jest.mock("@medusajs/orchestration", () => { return { @@ -62,7 +63,8 @@ describe("Export Workflow", function () { const work = exportWorkflow("id" as any, "result_step", prepare) - const wfHandler = work() + const container = createMedusaContainer() + const wfHandler = work(container) const input = { test: "payload", @@ -83,4 +85,40 @@ describe("Export Workflow", function () { expect(result).toEqual("invoke_test") }) + + describe("Using the exported workflow run", function () { + it("should prepare the input data before initializing the transaction", async function () { + let transformedInput + const prepare = jest.fn().mockImplementation(async (data) => { + data.__transformed = true + transformedInput = data + + return data + }) + + const work = exportWorkflow("id" as any, "result_step", prepare) + + const input = { + test: "payload", + } + + const container = createMedusaContainer() + + const { result } = await work.run({ + input, + container, + }) + + expect(input).toEqual({ + test: "payload", + }) + + expect(transformedInput).toEqual({ + test: "payload", + __transformed: true, + }) + + expect(result).toEqual("invoke_test") + }) + }) }) diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index e65e9452dd..04797fd3fa 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -81,6 +81,217 @@ export type ExportedWorkflow< > } +export type MainExportedWorkflow = { + // Main function on the exported workflow + ( + container?: LoadedModule[] | MedusaContainer + ): Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" + > & + ExportedWorkflow + + /** + * You can also directly call run, registerStepSuccess and registerStepFailure on the exported workflow + */ + + run( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + + registerStepSuccess( + args?: FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + + registerStepFailure( + args?: FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > +} + +function createContextualWorkflowRunner< + TData = unknown, + TResult = unknown, + TDataOverride = undefined, + TResultOverride = undefined +>({ + workflowId, + defaultResult, + dataPreparation, + options, + container, +}: { + workflowId: string + defaultResult?: string | Symbol + dataPreparation?: (data: TData) => Promise + options?: { + wrappedInput?: boolean + } + container?: LoadedModule[] | MedusaContainer +}): Omit & + ExportedWorkflow { + if (!container) { + container = MedusaModule.getLoadedModules().map( + (mod) => Object.values(mod)[0] + ) + } + + const flow = new LocalWorkflow(workflowId, container) + + const originalRun = flow.run.bind(flow) + const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) + const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) + + const originalExecution = async ( + method, + { throwOnError, resultFrom }, + ...args + ) => { + const transaction = await method.apply(method, args) + + const errors = transaction.getErrors(TransactionHandlerType.INVOKE) + + const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] + if (failedStatus.includes(transaction.getState()) && throwOnError) { + const errorMessage = errors + ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) + ?.join(`${EOL}`) + throw new Error(errorMessage) + } + + let result + if (options?.wrappedInput) { + result = await resolveValue(resultFrom, transaction.getContext()) + } else { + result = transaction.getContext().invoke?.[resultFrom] + } + + return { + errors, + transaction, + result, + } + } + + const newRun = async ( + { input, context, throwOnError, resultFrom, events }: FlowRunOptions = { + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + if (typeof dataPreparation === "function") { + try { + const copyInput = input ? JSON.parse(JSON.stringify(input)) : input + input = await dataPreparation(copyInput as TData) + } catch (err) { + if (throwOnError) { + throw new Error( + `Data preparation failed: ${err.message}${EOL}${err.stack}` + ) + } + return { + errors: [err], + } + } + } + + return await originalExecution( + originalRun, + { throwOnError, resultFrom }, + context?.transactionId ?? ulid(), + input, + context, + events + ) + } + flow.run = newRun as any + + const newRegisterStepSuccess = async ( + { + response, + idempotencyKey, + context, + throwOnError, + resultFrom, + events, + }: FlowRegisterStepSuccessOptions = { + idempotencyKey: "", + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + return await originalExecution( + originalRegisterStepSuccess, + { throwOnError, resultFrom }, + idempotencyKey, + response, + context, + events + ) + } + flow.registerStepSuccess = newRegisterStepSuccess as any + + const newRegisterStepFailure = async ( + { + response, + idempotencyKey, + context, + throwOnError, + resultFrom, + events, + }: FlowRegisterStepFailureOptions = { + idempotencyKey: "", + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + return await originalExecution( + originalRegisterStepFailure, + { throwOnError, resultFrom }, + idempotencyKey, + response, + context, + events + ) + } + flow.registerStepFailure = newRegisterStepFailure as any + + return flow as unknown as LocalWorkflow & + ExportedWorkflow +} + export const exportWorkflow = ( workflowId: string, defaultResult?: string | Symbol, @@ -88,7 +299,7 @@ export const exportWorkflow = ( options?: { wrappedInput?: boolean } -) => { +): MainExportedWorkflow => { function exportedWorkflow< TDataOverride = undefined, TResultOverride = undefined @@ -99,143 +310,133 @@ export const exportWorkflow = ( "run" | "registerStepSuccess" | "registerStepFailure" > & ExportedWorkflow { - if (!container) { - container = MedusaModule.getLoadedModules().map( - (mod) => Object.values(mod)[0] - ) + return createContextualWorkflowRunner< + TData, + TResult, + TDataOverride, + TResultOverride + >({ + workflowId, + defaultResult, + dataPreparation, + options, + container, + }) + } + + const buildRunnerFn = < + TAction extends "run" | "registerStepSuccess" | "registerStepFailure", + TDataOverride, + TResultOverride + >( + action: "run" | "registerStepSuccess" | "registerStepFailure", + container?: LoadedModule[] | MedusaContainer + ) => { + const contextualRunner = createContextualWorkflowRunner< + TData, + TResult, + TDataOverride, + TResultOverride + >({ + workflowId, + defaultResult, + dataPreparation, + options, + container, + }) + + return contextualRunner[action] as ExportedWorkflow< + TData, + TResult, + TDataOverride, + TResultOverride + >[TAction] + } + + exportedWorkflow.run = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > - const flow = new LocalWorkflow(workflowId, container) + return await buildRunnerFn<"run", TDataOverride, TResultOverride>( + "run", + container + )(inputArgs) + } - const originalRun = flow.run.bind(flow) - const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) - const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) - - const originalExecution = async ( - method, - { throwOnError, resultFrom }, - ...args - ) => { - const transaction = await method.apply(method, args) - - const errors = transaction.getErrors(TransactionHandlerType.INVOKE) - - const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] - if (failedStatus.includes(transaction.getState()) && throwOnError) { - const errorMessage = errors - ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) - ?.join(`${EOL}`) - throw new Error(errorMessage) - } - - let result - if (options?.wrappedInput) { - result = await resolveValue(resultFrom, transaction.getContext()) - } else { - result = transaction.getContext().invoke?.[resultFrom] - } - - return { - errors, - transaction, - result, - } + exportedWorkflow.registerStepSuccess = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > - const newRun = async ( - { input, context, throwOnError, resultFrom, events }: FlowRunOptions = { - throwOnError: true, - resultFrom: defaultResult, - } - ) => { - resultFrom ??= defaultResult - throwOnError ??= true + return await buildRunnerFn< + "registerStepSuccess", + TDataOverride, + TResultOverride + >( + "registerStepSuccess", + container + )(inputArgs) + } - if (typeof dataPreparation === "function") { - try { - const copyInput = input ? JSON.parse(JSON.stringify(input)) : input - input = await dataPreparation(copyInput as TData) - } catch (err) { - if (throwOnError) { - throw new Error( - `Data preparation failed: ${err.message}${EOL}${err.stack}` - ) - } - return { - errors: [err], - } - } - } - - return await originalExecution( - originalRun, - { throwOnError, resultFrom }, - context?.transactionId ?? ulid(), - input, - context, - events - ) + exportedWorkflow.registerStepFailure = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer } - flow.run = newRun as any + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > - const newRegisterStepSuccess = async ( - { - response, - idempotencyKey, - context, - throwOnError, - resultFrom, - events, - }: FlowRegisterStepSuccessOptions = { - idempotencyKey: "", - throwOnError: true, - resultFrom: defaultResult, - } - ) => { - resultFrom ??= defaultResult - throwOnError ??= true - - return await originalExecution( - originalRegisterStepSuccess, - { throwOnError, resultFrom }, - idempotencyKey, - response, - context, - events - ) - } - flow.registerStepSuccess = newRegisterStepSuccess as any - - const newRegisterStepFailure = async ( - { - response, - idempotencyKey, - context, - throwOnError, - resultFrom, - events, - }: FlowRegisterStepFailureOptions = { - idempotencyKey: "", - throwOnError: true, - resultFrom: defaultResult, - } - ) => { - resultFrom ??= defaultResult - throwOnError ??= true - - return await originalExecution( - originalRegisterStepFailure, - { throwOnError, resultFrom }, - idempotencyKey, - response, - context, - events - ) - } - flow.registerStepFailure = newRegisterStepFailure as any - - return flow as unknown as LocalWorkflow & - ExportedWorkflow + return await buildRunnerFn< + "registerStepFailure", + TDataOverride, + TResultOverride + >( + "registerStepFailure", + container + )(inputArgs) } MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)