From 738e9115ec920d48bc52b8a690847e58c87ca28e Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 22 Jan 2024 20:14:18 +0100 Subject: [PATCH] feat(workflows-sdk): run, registerStepSuccess and registerStepFailure shortcut (#6164) **What** Currently, when exporting a workflow using the workflowExport util, it is mandatory to first call the resulted function passing the container before being able to call run, registerStepSuccess or failure on it. Now, it is possible to either continue that way, or to directly call the run, registerStepSuccess or failure on the exported workflow and at that moment it is possible to pass a container if needed e.g ```ts const workflow = exportWorkflow("id" as any, "result_step", prepare) const wfRunner = workflow(container) wfRunner.run(...) // Here the container is not expected ``` or ```ts const workflow = exportWorkflow("id" as any, "result_step", prepare) workflow.run(...) // Here we can now pass an optional container ``` --- .changeset/red-emus-carry.md | 5 + .../helper/__tests__/workflow-export.spec.ts | 40 +- .../src/helper/workflow-export.ts | 461 +++++++++++++----- 3 files changed, 375 insertions(+), 131 deletions(-) create mode 100644 .changeset/red-emus-carry.md diff --git a/.changeset/red-emus-carry.md b/.changeset/red-emus-carry.md new file mode 100644 index 0000000000..a70e4ca5bc --- /dev/null +++ b/.changeset/red-emus-carry.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-sdk): run, registerStepSuccess and registerStepFailure shortcut diff --git a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts index f479bd9046..23161e1afa 100644 --- a/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts +++ b/packages/workflows-sdk/src/helper/__tests__/workflow-export.spec.ts @@ -1,4 +1,5 @@ import { exportWorkflow } from "../workflow-export" +import { createMedusaContainer } from "@medusajs/utils" jest.mock("@medusajs/orchestration", () => { return { @@ -62,7 +63,8 @@ describe("Export Workflow", function () { const work = exportWorkflow("id" as any, "result_step", prepare) - const wfHandler = work() + const container = createMedusaContainer() + const wfHandler = work(container) const input = { test: "payload", @@ -83,4 +85,40 @@ describe("Export Workflow", function () { expect(result).toEqual("invoke_test") }) + + describe("Using the exported workflow run", function () { + it("should prepare the input data before initializing the transaction", async function () { + let transformedInput + const prepare = jest.fn().mockImplementation(async (data) => { + data.__transformed = true + transformedInput = data + + return data + }) + + const work = exportWorkflow("id" as any, "result_step", prepare) + + const input = { + test: "payload", + } + + const container = createMedusaContainer() + + const { result } = await work.run({ + input, + container, + }) + + expect(input).toEqual({ + test: "payload", + }) + + expect(transformedInput).toEqual({ + test: "payload", + __transformed: true, + }) + + expect(result).toEqual("invoke_test") + }) + }) }) diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index e65e9452dd..04797fd3fa 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -81,6 +81,217 @@ export type ExportedWorkflow< > } +export type MainExportedWorkflow = { + // Main function on the exported workflow + ( + container?: LoadedModule[] | MedusaContainer + ): Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" + > & + ExportedWorkflow + + /** + * You can also directly call run, registerStepSuccess and registerStepFailure on the exported workflow + */ + + run( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + + registerStepSuccess( + args?: FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + + registerStepFailure( + args?: FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer + } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > +} + +function createContextualWorkflowRunner< + TData = unknown, + TResult = unknown, + TDataOverride = undefined, + TResultOverride = undefined +>({ + workflowId, + defaultResult, + dataPreparation, + options, + container, +}: { + workflowId: string + defaultResult?: string | Symbol + dataPreparation?: (data: TData) => Promise + options?: { + wrappedInput?: boolean + } + container?: LoadedModule[] | MedusaContainer +}): Omit & + ExportedWorkflow { + if (!container) { + container = MedusaModule.getLoadedModules().map( + (mod) => Object.values(mod)[0] + ) + } + + const flow = new LocalWorkflow(workflowId, container) + + const originalRun = flow.run.bind(flow) + const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) + const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) + + const originalExecution = async ( + method, + { throwOnError, resultFrom }, + ...args + ) => { + const transaction = await method.apply(method, args) + + const errors = transaction.getErrors(TransactionHandlerType.INVOKE) + + const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] + if (failedStatus.includes(transaction.getState()) && throwOnError) { + const errorMessage = errors + ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) + ?.join(`${EOL}`) + throw new Error(errorMessage) + } + + let result + if (options?.wrappedInput) { + result = await resolveValue(resultFrom, transaction.getContext()) + } else { + result = transaction.getContext().invoke?.[resultFrom] + } + + return { + errors, + transaction, + result, + } + } + + const newRun = async ( + { input, context, throwOnError, resultFrom, events }: FlowRunOptions = { + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + if (typeof dataPreparation === "function") { + try { + const copyInput = input ? JSON.parse(JSON.stringify(input)) : input + input = await dataPreparation(copyInput as TData) + } catch (err) { + if (throwOnError) { + throw new Error( + `Data preparation failed: ${err.message}${EOL}${err.stack}` + ) + } + return { + errors: [err], + } + } + } + + return await originalExecution( + originalRun, + { throwOnError, resultFrom }, + context?.transactionId ?? ulid(), + input, + context, + events + ) + } + flow.run = newRun as any + + const newRegisterStepSuccess = async ( + { + response, + idempotencyKey, + context, + throwOnError, + resultFrom, + events, + }: FlowRegisterStepSuccessOptions = { + idempotencyKey: "", + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + return await originalExecution( + originalRegisterStepSuccess, + { throwOnError, resultFrom }, + idempotencyKey, + response, + context, + events + ) + } + flow.registerStepSuccess = newRegisterStepSuccess as any + + const newRegisterStepFailure = async ( + { + response, + idempotencyKey, + context, + throwOnError, + resultFrom, + events, + }: FlowRegisterStepFailureOptions = { + idempotencyKey: "", + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + resultFrom ??= defaultResult + throwOnError ??= true + + return await originalExecution( + originalRegisterStepFailure, + { throwOnError, resultFrom }, + idempotencyKey, + response, + context, + events + ) + } + flow.registerStepFailure = newRegisterStepFailure as any + + return flow as unknown as LocalWorkflow & + ExportedWorkflow +} + export const exportWorkflow = ( workflowId: string, defaultResult?: string | Symbol, @@ -88,7 +299,7 @@ export const exportWorkflow = ( options?: { wrappedInput?: boolean } -) => { +): MainExportedWorkflow => { function exportedWorkflow< TDataOverride = undefined, TResultOverride = undefined @@ -99,143 +310,133 @@ export const exportWorkflow = ( "run" | "registerStepSuccess" | "registerStepFailure" > & ExportedWorkflow { - if (!container) { - container = MedusaModule.getLoadedModules().map( - (mod) => Object.values(mod)[0] - ) + return createContextualWorkflowRunner< + TData, + TResult, + TDataOverride, + TResultOverride + >({ + workflowId, + defaultResult, + dataPreparation, + options, + container, + }) + } + + const buildRunnerFn = < + TAction extends "run" | "registerStepSuccess" | "registerStepFailure", + TDataOverride, + TResultOverride + >( + action: "run" | "registerStepSuccess" | "registerStepFailure", + container?: LoadedModule[] | MedusaContainer + ) => { + const contextualRunner = createContextualWorkflowRunner< + TData, + TResult, + TDataOverride, + TResultOverride + >({ + workflowId, + defaultResult, + dataPreparation, + options, + container, + }) + + return contextualRunner[action] as ExportedWorkflow< + TData, + TResult, + TDataOverride, + TResultOverride + >[TAction] + } + + exportedWorkflow.run = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > - const flow = new LocalWorkflow(workflowId, container) + return await buildRunnerFn<"run", TDataOverride, TResultOverride>( + "run", + container + )(inputArgs) + } - const originalRun = flow.run.bind(flow) - const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow) - const originalRegisterStepFailure = flow.registerStepFailure.bind(flow) - - const originalExecution = async ( - method, - { throwOnError, resultFrom }, - ...args - ) => { - const transaction = await method.apply(method, args) - - const errors = transaction.getErrors(TransactionHandlerType.INVOKE) - - const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] - if (failedStatus.includes(transaction.getState()) && throwOnError) { - const errorMessage = errors - ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) - ?.join(`${EOL}`) - throw new Error(errorMessage) - } - - let result - if (options?.wrappedInput) { - result = await resolveValue(resultFrom, transaction.getContext()) - } else { - result = transaction.getContext().invoke?.[resultFrom] - } - - return { - errors, - transaction, - result, - } + exportedWorkflow.registerStepSuccess = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer } + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > - const newRun = async ( - { input, context, throwOnError, resultFrom, events }: FlowRunOptions = { - throwOnError: true, - resultFrom: defaultResult, - } - ) => { - resultFrom ??= defaultResult - throwOnError ??= true + return await buildRunnerFn< + "registerStepSuccess", + TDataOverride, + TResultOverride + >( + "registerStepSuccess", + container + )(inputArgs) + } - if (typeof dataPreparation === "function") { - try { - const copyInput = input ? JSON.parse(JSON.stringify(input)) : input - input = await dataPreparation(copyInput as TData) - } catch (err) { - if (throwOnError) { - throw new Error( - `Data preparation failed: ${err.message}${EOL}${err.stack}` - ) - } - return { - errors: [err], - } - } - } - - return await originalExecution( - originalRun, - { throwOnError, resultFrom }, - context?.transactionId ?? ulid(), - input, - context, - events - ) + exportedWorkflow.registerStepFailure = async < + TDataOverride = undefined, + TResultOverride = undefined + >( + args?: FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > & { + container?: LoadedModule[] | MedusaContainer } - flow.run = newRun as any + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > => { + const container = args?.container + delete args?.container + const inputArgs = { ...args } as FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > - const newRegisterStepSuccess = async ( - { - response, - idempotencyKey, - context, - throwOnError, - resultFrom, - events, - }: FlowRegisterStepSuccessOptions = { - idempotencyKey: "", - throwOnError: true, - resultFrom: defaultResult, - } - ) => { - resultFrom ??= defaultResult - throwOnError ??= true - - return await originalExecution( - originalRegisterStepSuccess, - { throwOnError, resultFrom }, - idempotencyKey, - response, - context, - events - ) - } - flow.registerStepSuccess = newRegisterStepSuccess as any - - const newRegisterStepFailure = async ( - { - response, - idempotencyKey, - context, - throwOnError, - resultFrom, - events, - }: FlowRegisterStepFailureOptions = { - idempotencyKey: "", - throwOnError: true, - resultFrom: defaultResult, - } - ) => { - resultFrom ??= defaultResult - throwOnError ??= true - - return await originalExecution( - originalRegisterStepFailure, - { throwOnError, resultFrom }, - idempotencyKey, - response, - context, - events - ) - } - flow.registerStepFailure = newRegisterStepFailure as any - - return flow as unknown as LocalWorkflow & - ExportedWorkflow + return await buildRunnerFn< + "registerStepFailure", + TDataOverride, + TResultOverride + >( + "registerStepFailure", + container + )(inputArgs) } MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)