diff --git a/.changeset/gentle-zebras-poke.md b/.changeset/gentle-zebras-poke.md new file mode 100644 index 0000000000..eab8b8e5df --- /dev/null +++ b/.changeset/gentle-zebras-poke.md @@ -0,0 +1,5 @@ +--- +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-sdk): Execute workflows as step in other workflows diff --git a/packages/orchestration/src/workflow/local-workflow.ts b/packages/orchestration/src/workflow/local-workflow.ts index 8b7f3c03bc..7956841be4 100644 --- a/packages/orchestration/src/workflow/local-workflow.ts +++ b/packages/orchestration/src/workflow/local-workflow.ts @@ -1,11 +1,11 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { - MedusaContext, - MedusaContextType, - MedusaModuleType, createMedusaContainer, isDefined, isString, + MedusaContext, + MedusaContextType, + MedusaModuleType, } from "@medusajs/utils" import { asValue } from "awilix" import { @@ -29,7 +29,7 @@ type StepHandler = { } export class LocalWorkflow { - protected container: MedusaContainer + protected container_: MedusaContainer protected workflowId: string protected flow: OrchestratorBuilder protected customOptions: Partial = {} @@ -37,9 +37,17 @@ export class LocalWorkflow { protected handlers: Map protected medusaContext?: Context + get container() { + return this.container_ + } + + set container(modulesLoaded: LoadedModule[] | MedusaContainer) { + this.resolveContainer(modulesLoaded) + } + constructor( workflowId: string, - modulesLoaded: LoadedModule[] | MedusaContainer + modulesLoaded?: LoadedModule[] | MedusaContainer ) { const globalWorkflow = WorkflowManager.getWorkflow(workflowId) if (!globalWorkflow) { @@ -51,6 +59,10 @@ export class LocalWorkflow { this.workflow = globalWorkflow this.handlers = new Map(globalWorkflow.handlers_) + this.resolveContainer(modulesLoaded) + } + + private resolveContainer(modulesLoaded?: LoadedModule[] | MedusaContainer) { let container if (!Array.isArray(modulesLoaded) && modulesLoaded) { @@ -68,7 +80,7 @@ export class LocalWorkflow { } } - this.container = this.contextualizedMedusaModules(container) + this.container_ = this.contextualizedMedusaModules(container) } private contextualizedMedusaModules(container) { @@ -326,7 +338,7 @@ export class LocalWorkflow { const transaction = await orchestrator.beginTransaction( uniqueTransactionId, - handler(this.container, context), + handler(this.container_, context), input ) @@ -349,7 +361,7 @@ export class LocalWorkflow { const transaction = await orchestrator.retrieveExistingTransaction( uniqueTransactionId, - handler(this.container, context) + handler(this.container_, context) ) return transaction @@ -397,7 +409,7 @@ export class LocalWorkflow { const transaction = await orchestrator.registerStepSuccess( idempotencyKey, - handler(this.container, context), + handler(this.container_, context), undefined, response ) @@ -425,7 +437,7 @@ export class LocalWorkflow { const transaction = await orchestrator.registerStepFailure( idempotencyKey, error, - handler(this.container, context) + handler(this.container_, context) ) cleanUpEventListeners() diff --git a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index de1b6f2a7c..6ff48acaf0 100644 --- a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -5,17 +5,20 @@ import { TransactionStep, } from "@medusajs/orchestration" import { ContainerLike, Context, MedusaContainer } from "@medusajs/types" -import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" +import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils" import { - MedusaWorkflow, - ReturnWorkflow, - resolveValue, type FlowRunOptions, + MedusaWorkflow, + resolveValue, + ReturnWorkflow, } from "@medusajs/workflows-sdk" import { ulid } from "ulid" import { InMemoryDistributedTransactionStorage } from "../utils" -export type WorkflowOrchestratorRunOptions = FlowRunOptions & { +export type WorkflowOrchestratorRunOptions = Omit< + FlowRunOptions, + "container" +> & { transactionId?: string container?: ContainerLike } diff --git a/packages/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/workflow-engine-inmemory/src/services/workflows-module.ts index 8701e6d26d..0a1bb3ee96 100644 --- a/packages/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/workflow-engine-inmemory/src/services/workflows-module.ts @@ -9,9 +9,9 @@ import { import { InjectManager, InjectSharedContext, + isString, MedusaContext, MedusaError, - isString, } from "@medusajs/utils" import type { IWorkflowEngineService, @@ -191,7 +191,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService { transactionId: string, @MedusaContext() context: Context = {} ) { - return this.workflowOrchestratorService_.getRunningTransaction( + return await this.workflowOrchestratorService_.getRunningTransaction( workflowId, transactionId, context @@ -211,7 +211,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService { }, @MedusaContext() context: Context = {} ) { - return this.workflowOrchestratorService_.setStepSuccess( + return await this.workflowOrchestratorService_.setStepSuccess( { idempotencyKey, stepResponse, @@ -234,7 +234,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService { }, @MedusaContext() context: Context = {} ) { - return this.workflowOrchestratorService_.setStepFailure( + return await this.workflowOrchestratorService_.setStepFailure( { idempotencyKey, stepResponse, diff --git a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts index 09d048df21..300d248344 100644 --- a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -10,18 +10,21 @@ import { Logger, MedusaContainer, } from "@medusajs/types" -import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" +import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils" import { FlowRunOptions, MedusaWorkflow, - ReturnWorkflow, resolveValue, + ReturnWorkflow, } from "@medusajs/workflows-sdk" import Redis from "ioredis" import { ulid } from "ulid" import type { RedisDistributedTransactionStorage } from "../utils" -export type WorkflowOrchestratorRunOptions = FlowRunOptions & { +export type WorkflowOrchestratorRunOptions = Omit< + FlowRunOptions, + "container" +> & { transactionId?: string container?: ContainerLike } diff --git a/packages/workflows-sdk/src/helper/index.ts b/packages/workflows-sdk/src/helper/index.ts index a75943d8bb..9f8801da04 100644 --- a/packages/workflows-sdk/src/helper/index.ts +++ b/packages/workflows-sdk/src/helper/index.ts @@ -2,3 +2,4 @@ export * from "./merge-data" export * from "./empty-handler" export * from "./pipe" export * from "./workflow-export" +export * from "./type" diff --git a/packages/workflows-sdk/src/helper/type.ts b/packages/workflows-sdk/src/helper/type.ts new file mode 100644 index 0000000000..f9838126e7 --- /dev/null +++ b/packages/workflows-sdk/src/helper/type.ts @@ -0,0 +1,134 @@ +import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" +import { + DistributedTransaction, + DistributedTransactionEvents, + LocalWorkflow, + TransactionStepError, +} from "@medusajs/orchestration" + +export type FlowRunOptions = { + input?: TData + context?: Context + resultFrom?: string | string[] | Symbol + throwOnError?: boolean + events?: DistributedTransactionEvents + container?: LoadedModule[] | MedusaContainer +} + +export type FlowRegisterStepSuccessOptions = { + idempotencyKey: string + response?: TData + context?: Context + resultFrom?: string | string[] | Symbol + throwOnError?: boolean + events?: DistributedTransactionEvents + container?: LoadedModule[] | MedusaContainer +} + +export type FlowRegisterStepFailureOptions = { + idempotencyKey: string + response?: TData + context?: Context + resultFrom?: string | string[] | Symbol + throwOnError?: boolean + events?: DistributedTransactionEvents + container?: LoadedModule[] | MedusaContainer +} + +export type FlowCancelOptions = { + transaction?: DistributedTransaction + transactionId?: string + context?: Context + throwOnError?: boolean + events?: DistributedTransactionEvents + container?: LoadedModule[] | MedusaContainer +} + +export type WorkflowResult = { + errors: TransactionStepError[] + transaction: DistributedTransaction + result: TResult +} + +export type ExportedWorkflow< + TData = unknown, + TResult = unknown, + TDataOverride = undefined, + TResultOverride = undefined +> = { + run: ( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + registerStepSuccess: ( + args?: FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + registerStepFailure: ( + args?: FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + cancel: (args?: FlowCancelOptions) => Promise +} + +export type MainExportedWorkflow = { + // Main function on the exported workflow + ( + container?: LoadedModule[] | MedusaContainer + ): Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + > & + ExportedWorkflow +} & { + /** + * You can also directly call run, registerStepSuccess, registerStepFailure and cancel on the exported workflow + */ + + run( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + + registerStepSuccess( + args?: FlowRegisterStepSuccessOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + + registerStepFailure( + args?: FlowRegisterStepFailureOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ): Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + + cancel(args?: FlowCancelOptions): Promise +} diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index 49af002e64..ee48dee23c 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -1,12 +1,9 @@ import { - DistributedTransaction, - DistributedTransactionEvents, LocalWorkflow, TransactionHandlerType, TransactionState, - TransactionStepError, } from "@medusajs/orchestration" -import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" +import { LoadedModule, MedusaContainer } from "@medusajs/types" import { MedusaModule } from "@medusajs/modules-sdk" import { MedusaContextType } from "@medusajs/utils" @@ -14,139 +11,15 @@ import { EOL } from "os" import { ulid } from "ulid" import { MedusaWorkflow } from "../medusa-workflow" import { resolveValue } from "../utils/composer" - -export type FlowRunOptions = { - input?: TData - context?: Context - resultFrom?: string | string[] | Symbol - throwOnError?: boolean - events?: DistributedTransactionEvents -} - -export type FlowRegisterStepSuccessOptions = { - idempotencyKey: string - response?: TData - context?: Context - resultFrom?: string | string[] | Symbol - throwOnError?: boolean - events?: DistributedTransactionEvents -} - -export type FlowRegisterStepFailureOptions = { - idempotencyKey: string - response?: TData - context?: Context - resultFrom?: string | string[] | Symbol - throwOnError?: boolean - events?: DistributedTransactionEvents -} - -export type FlowCancelOptions = { - transaction?: DistributedTransaction - transactionId?: string - context?: Context - throwOnError?: boolean - events?: DistributedTransactionEvents -} - -export type WorkflowResult = { - errors: TransactionStepError[] - transaction: DistributedTransaction - result: TResult -} - -export type ExportedWorkflow< - TData = unknown, - TResult = unknown, - TDataOverride = undefined, - TResultOverride = undefined -> = { - run: ( - args?: FlowRunOptions< - TDataOverride extends undefined ? TData : TDataOverride - > - ) => Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - registerStepSuccess: ( - args?: FlowRegisterStepSuccessOptions< - TDataOverride extends undefined ? TData : TDataOverride - > - ) => Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - registerStepFailure: ( - args?: FlowRegisterStepFailureOptions< - TDataOverride extends undefined ? TData : TDataOverride - > - ) => Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - cancel: (args?: FlowCancelOptions) => Promise -} - -export type MainExportedWorkflow = { - // Main function on the exported workflow - ( - container?: LoadedModule[] | MedusaContainer - ): Omit< - LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" - > & - ExportedWorkflow - - /** - * You can also directly call run, registerStepSuccess, registerStepFailure and cancel on the exported workflow - */ - - run( - args?: FlowRunOptions< - TDataOverride extends undefined ? TData : TDataOverride - > & { - container?: LoadedModule[] | MedusaContainer - } - ): Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - - registerStepSuccess( - args?: FlowRegisterStepSuccessOptions< - TDataOverride extends undefined ? TData : TDataOverride - > & { - container?: LoadedModule[] | MedusaContainer - } - ): Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - - registerStepFailure( - args?: FlowRegisterStepFailureOptions< - TDataOverride extends undefined ? TData : TDataOverride - > & { - container?: LoadedModule[] | MedusaContainer - } - ): Promise< - WorkflowResult< - TResultOverride extends undefined ? TResult : TResultOverride - > - > - - cancel( - args?: FlowCancelOptions & { - container?: LoadedModule[] | MedusaContainer - } - ): Promise -} +import { + ExportedWorkflow, + FlowCancelOptions, + FlowRegisterStepFailureOptions, + FlowRegisterStepSuccessOptions, + FlowRunOptions, + MainExportedWorkflow, + WorkflowResult, +} from "./type" function createContextualWorkflowRunner< TData = unknown, @@ -172,12 +45,6 @@ function createContextualWorkflowRunner< "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" > & ExportedWorkflow { - if (!container) { - container = MedusaModule.getLoadedModules().map( - (mod) => Object.values(mod)[0] - ) - } - const flow = new LocalWorkflow(workflowId, container!) const originalRun = flow.run.bind(flow) @@ -187,9 +54,24 @@ function createContextualWorkflowRunner< const originalExecution = async ( method, - { throwOnError, resultFrom, isCancel = false }, + { + throwOnError, + resultFrom, + isCancel = false, + container: executionContainer, + }, ...args ) => { + if (!executionContainer && !flow.container) { + executionContainer = MedusaModule.getLoadedModules().map( + (mod) => Object.values(mod)[0] + ) + } + + if (!flow.container) { + flow.container = executionContainer + } + const transaction = await method.apply(method, args) let errors = transaction.getErrors(TransactionHandlerType.INVOKE) @@ -236,6 +118,7 @@ function createContextualWorkflowRunner< throwOnError, resultFrom, events, + container, }: FlowRunOptions = { throwOnError: true, resultFrom: defaultResult, @@ -269,7 +152,7 @@ function createContextualWorkflowRunner< return await originalExecution( originalRun, - { throwOnError, resultFrom }, + { throwOnError, resultFrom, container }, context.transactionId, input, context, @@ -286,6 +169,7 @@ function createContextualWorkflowRunner< throwOnError, resultFrom, events, + container, }: FlowRegisterStepSuccessOptions = { idempotencyKey: "", throwOnError: true, @@ -304,7 +188,7 @@ function createContextualWorkflowRunner< return await originalExecution( originalRegisterStepSuccess, - { throwOnError, resultFrom }, + { throwOnError, resultFrom, container }, idempotencyKey, response, context, @@ -321,6 +205,7 @@ function createContextualWorkflowRunner< throwOnError, resultFrom, events, + container, }: FlowRegisterStepFailureOptions = { idempotencyKey: "", throwOnError: true, @@ -339,7 +224,7 @@ function createContextualWorkflowRunner< return await originalExecution( originalRegisterStepFailure, - { throwOnError, resultFrom }, + { throwOnError, resultFrom, container }, idempotencyKey, response, context, @@ -355,6 +240,7 @@ function createContextualWorkflowRunner< context: outerContext, throwOnError, events, + container, }: FlowCancelOptions = { throwOnError: true, } @@ -373,6 +259,7 @@ function createContextualWorkflowRunner< throwOnError, resultFrom: undefined, isCancel: true, + container, }, transaction ?? transactionId, context, @@ -397,6 +284,7 @@ export const exportWorkflow = ( TDataOverride = undefined, TResultOverride = undefined >( + // TODO: rm when all usage have been migrated container?: LoadedModule[] | MedusaContainer ): Omit< LocalWorkflow, @@ -456,9 +344,7 @@ export const exportWorkflow = ( >( args?: FlowRunOptions< TDataOverride extends undefined ? TData : TDataOverride - > & { - container?: LoadedModule[] | MedusaContainer - } + > ): Promise< WorkflowResult< TResultOverride extends undefined ? TResult : TResultOverride @@ -482,9 +368,7 @@ export const exportWorkflow = ( >( args?: FlowRegisterStepSuccessOptions< TDataOverride extends undefined ? TData : TDataOverride - > & { - container?: LoadedModule[] | MedusaContainer - } + > ): Promise< WorkflowResult< TResultOverride extends undefined ? TResult : TResultOverride @@ -512,9 +396,7 @@ export const exportWorkflow = ( >( args?: FlowRegisterStepFailureOptions< TDataOverride extends undefined ? TData : TDataOverride - > & { - container?: LoadedModule[] | MedusaContainer - } + > ): Promise< WorkflowResult< TResultOverride extends undefined ? TResult : TResultOverride @@ -537,9 +419,7 @@ export const exportWorkflow = ( } exportedWorkflow.cancel = async ( - args?: FlowCancelOptions & { - container?: LoadedModule[] | MedusaContainer - } + args?: FlowCancelOptions ): Promise => { const container = args?.container delete args?.container @@ -552,5 +432,5 @@ export const exportWorkflow = ( } MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow) - return exportedWorkflow + return exportedWorkflow as MainExportedWorkflow } diff --git a/packages/workflows-sdk/src/utils/_playground.ts b/packages/workflows-sdk/src/utils/_playground.ts index 57d2b05b09..493371369e 100644 --- a/packages/workflows-sdk/src/utils/_playground.ts +++ b/packages/workflows-sdk/src/utils/_playground.ts @@ -1,7 +1,12 @@ -import { createStep, createWorkflow, StepResponse } from "./composer" +import { + createStep, + createWorkflow, + StepResponse, + WorkflowData, +} from "./composer" const step1 = createStep("step1", async (input: {}, context) => { - return new StepResponse({ step1: ["step1"] }) + return new StepResponse({ step1: "step1" }) }) type Step2Input = { filters: { id: string[] } } | { filters: { id: string } } @@ -13,17 +18,22 @@ const step3 = createStep("step3", async () => { return new StepResponse({ step3: "step3" }) }) -const workflow = createWorkflow("workflow", function () { - const step1Res = step1() - step3() - return step2({ filters: { id: step1Res.step1 } }) -}) +const workflow = createWorkflow( + "sub-workflow", + function (input: WorkflowData<{ outsideWorkflowData: string }>) { + step1() + step3() + return step2({ filters: { id: input.outsideWorkflowData } }) + } +) const workflow2 = createWorkflow("workflow", function () { const step1Res = step1() step3() - workflow() - return step2({ filters: { id: step1Res.step1 } }) + + const workflowRes = workflow.asStep({ outsideWorkflowData: step1Res.step1 }) + + return workflowRes }) workflow2() diff --git a/packages/workflows-sdk/src/utils/composer/__tests__/index.spec.ts b/packages/workflows-sdk/src/utils/composer/__tests__/index.spec.ts index 4888ce5cb2..53217123c3 100644 --- a/packages/workflows-sdk/src/utils/composer/__tests__/index.spec.ts +++ b/packages/workflows-sdk/src/utils/composer/__tests__/index.spec.ts @@ -2,8 +2,109 @@ import { createStep } from "../create-step" import { createWorkflow } from "../create-workflow" import { StepResponse } from "../helpers" import { transform } from "../transform" +import { WorkflowData } from "../type" + +let count = 1 +const getNewWorkflowId = () => `workflow-${count++}` describe("Workflow composer", () => { + describe("running sub workflows", () => { + it("should succeed", async function () { + const step1 = createStep("step1", async () => { + return new StepResponse({ result: "step1" }) + }) + const step2 = createStep("step2", async (input: string) => { + return new StepResponse({ result: input }) + }) + const step3 = createStep("step3", async (input: string) => { + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow( + getNewWorkflowId(), + function (input: WorkflowData) { + step1() + return step2(input) + } + ) + + const workflow = createWorkflow(getNewWorkflowId(), function () { + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + return step3(subWorkflowRes.result) + }) + + const { result } = await workflow.run({ input: {} }) + + expect(result).toEqual({ result: "hi from outside" }) + }) + + it("should revert the workflow and sub workflow on failure", async function () { + const step1Mock = jest.fn() + const step1 = createStep( + "step1", + async () => { + return new StepResponse({ result: "step1" }) + }, + step1Mock + ) + + const step2Mock = jest.fn() + const step2 = createStep( + "step2", + async (input: string) => { + return new StepResponse({ result: input }) + }, + step2Mock + ) + + const step3Mock = jest.fn() + const step3 = createStep( + "step3", + async () => { + return new StepResponse() + }, + step3Mock + ) + + const step4WithError = createStep("step4", async () => { + throw new Error("Step4 failed") + }) + + const subWorkflow = createWorkflow( + getNewWorkflowId(), + function (input: WorkflowData) { + step1() + return step2(input) + } + ) + + const workflow = createWorkflow(getNewWorkflowId(), function () { + step3() + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + step4WithError() + return subWorkflowRes + }) + + const { errors } = await workflow.run({ throwOnError: false }) + + expect(errors).toEqual([ + expect.objectContaining({ + error: expect.objectContaining({ + message: "Step4 failed", + }), + }), + ]) + + expect(step1Mock).toHaveBeenCalledTimes(1) + expect(step2Mock).toHaveBeenCalledTimes(1) + expect(step3Mock).toHaveBeenCalledTimes(1) + }) + }) + it("should not throw an unhandled error on failed transformer resolution after a step fail, but should rather push the errors in the errors result", async function () { const step1 = createStep("step1", async () => { return new StepResponse({ result: "step1" }) @@ -25,7 +126,7 @@ describe("Workflow composer", () => { }) }) - const { errors } = await work().run({ input: {}, throwOnError: false }) + const { errors } = await work.run({ input: {}, throwOnError: false }) expect(errors).toEqual([ { diff --git a/packages/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/workflows-sdk/src/utils/composer/create-workflow.ts index dd4806e3bf..00c209633a 100644 --- a/packages/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/workflows-sdk/src/utils/composer/create-workflow.ts @@ -1,96 +1,24 @@ import { - LocalWorkflow, TransactionModelOptions, WorkflowHandler, WorkflowManager, } from "@medusajs/orchestration" import { LoadedModule, MedusaContainer } from "@medusajs/types" import { isString, OrchestrationUtils } from "@medusajs/utils" -import { ExportedWorkflow, exportWorkflow } from "../../helper" +import { exportWorkflow } from "../../helper" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, + ReturnWorkflow, + StepFunction, WorkflowData, WorkflowDataProperties, } from "./type" +import { createStep } from "./create-step" +import { StepResponse } from "./helpers" global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null -/** - * An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create - * an executable workflow, optionally within a specified container. So, to execute the workflow, you must invoke the exported workflow, then run the - * `run` method of the exported workflow. - * - * @example - * To execute a workflow: - * - * ```ts - * myWorkflow() - * .run({ - * input: { - * name: "John" - * } - * }) - * .then(({ result }) => { - * console.log(result) - * }) - * ``` - * - * To specify the container of the workflow, you can pass it as an argument to the call of the exported workflow. This is necessary when executing the workflow - * within a Medusa resource such as an API Route or a Subscriber. - * - * For example: - * - * ```ts - * import type { - * MedusaRequest, - * MedusaResponse - * } from "@medusajs/medusa"; - * import myWorkflow from "../../../workflows/hello-world"; - * - * export async function GET( - * req: MedusaRequest, - * res: MedusaResponse - * ) { - * const { result } = await myWorkflow(req.scope) - * .run({ - * input: { - * name: req.query.name as string - * } - * }) - * - * res.send(result) - * } - * ``` - */ -export type ReturnWorkflow< - TData, - TResult, - THooks extends Record -> = { - ( - container?: LoadedModule[] | MedusaContainer - ): Omit< - LocalWorkflow, - "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" - > & - ExportedWorkflow -} & THooks & { - getName: () => string - } & { - config: (config: TransactionModelOptions) => void - } - -/** - * Extract the raw type of the expected input data of a workflow. - * - * @example - * type WorkflowInputData = UnwrapWorkflowInputDataType - */ -export type UnwrapWorkflowInputDataType< - T extends ReturnWorkflow -> = T extends ReturnWorkflow ? TData : never - /** * This function creates a workflow with the provided name and a constructor function. * The constructor function builds the workflow from steps created by the {@link createStep} function. @@ -256,5 +184,38 @@ export function createWorkflow< mainFlow.getName = () => name + mainFlow.run = mainFlow().run + + mainFlow.runAsStep = ({ + input, + }: { + input: TData + }): ReturnType> => { + // TODO: Async sub workflow is not supported yet + // Info: Once the export workflow can fire the execution through the engine if loaded, the async workflow can be executed, + // the step would inherit the async configuration and subscribe to the onFinish event of the sub worklow and mark itself as success or failure + return createStep( + `${name}-as-step`, + async (stepInput: TData, stepContext) => { + const { container, ...sharedContext } = stepContext + + const transaction = await workflow.run({ + input: stepInput as any, + container, + context: sharedContext, + }) + + return new StepResponse(transaction.result, transaction) + }, + async (transaction, { container }) => { + if (!transaction) { + return + } + + await workflow(container).cancel(transaction) + } + )(input) as ReturnType> + } + return mainFlow as ReturnWorkflow } diff --git a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts index e98c3e4948..b8d887a8b8 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts @@ -1,5 +1,5 @@ import { PermanentStepFailureError } from "@medusajs/orchestration" -import { OrchestrationUtils } from "@medusajs/utils" +import { isDefined, OrchestrationUtils } from "@medusajs/utils" /** * This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`. @@ -26,13 +26,15 @@ export class StepResponse { /** * The output of the step. */ - output: TOutput, + output?: TOutput, /** * The input to be passed as a parameter to the step's compensation function. If not provided, the `output` will be provided instead. */ compensateInput?: TCompensateInput ) { - this.#output = output + if (isDefined(output)) { + this.#output = output + } this.#compensateInput = (compensateInput ?? output) as TCompensateInput } diff --git a/packages/workflows-sdk/src/utils/composer/type.ts b/packages/workflows-sdk/src/utils/composer/type.ts index 9a07f80c2b..e7f2691042 100644 --- a/packages/workflows-sdk/src/utils/composer/type.ts +++ b/packages/workflows-sdk/src/utils/composer/type.ts @@ -1,11 +1,14 @@ import { + LocalWorkflow, OrchestratorBuilder, TransactionContext as OriginalWorkflowTransactionContext, + TransactionModelOptions, TransactionPayload, TransactionStepsDefinition, WorkflowHandler, } from "@medusajs/orchestration" -import { Context, MedusaContainer } from "@medusajs/types" +import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" +import { ExportedWorkflow } from "../../helper" export type StepFunctionResult = (this: CreateWorkflowComposerContext) => WorkflowData @@ -134,3 +137,89 @@ export type WorkflowTransactionContext = StepExecutionContext & OriginalWorkflowTransactionContext & { invoke: { [key: string]: { output: any } } } + +/** + * An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create + * an executable workflow, optionally within a specified container. So, to execute the workflow, you must invoke the exported workflow, then run the + * `run` method of the exported workflow. + * + * @example + * To execute a workflow: + * + * ```ts + * myWorkflow() + * .run({ + * input: { + * name: "John" + * } + * }) + * .then(({ result }) => { + * console.log(result) + * }) + * ``` + * + * To specify the container of the workflow, you can pass it as an argument to the call of the exported workflow. This is necessary when executing the workflow + * within a Medusa resource such as an API Route or a Subscriber. + * + * For example: + * + * ```ts + * import type { + * MedusaRequest, + * MedusaResponse + * } from "@medusajs/medusa"; + * import myWorkflow from "../../../workflows/hello-world"; + * + * export async function GET( + * req: MedusaRequest, + * res: MedusaResponse + * ) { + * const { result } = await myWorkflow(req.scope) + * .run({ + * input: { + * name: req.query.name as string + * } + * }) + * + * res.send(result) + * } + * ``` + */ +export type ReturnWorkflow< + TData, + TResult, + THooks extends Record +> = { + ( + container?: LoadedModule[] | MedusaContainer + ): Omit< + LocalWorkflow, + "run" | "registerStepSuccess" | "registerStepFailure" | "cancel" + > & + ExportedWorkflow +} & THooks & { + runAsStep: ({ + input, + }: { + input: TData + }) => ReturnType> + run: ( + ...args: Parameters< + ExportedWorkflow["run"] + > + ) => ReturnType< + ExportedWorkflow["run"] + > + getName: () => string + config: (config: TransactionModelOptions) => void + } + +/** + * Extract the raw type of the expected input data of a workflow. + * + * @example + * type WorkflowInputData = UnwrapWorkflowInputDataType + */ +export type UnwrapWorkflowInputDataType< + T extends ReturnWorkflow +> = T extends ReturnWorkflow ? TData : never