diff --git a/.changeset/rich-drinks-punch.md b/.changeset/rich-drinks-punch.md new file mode 100644 index 0000000000..ca92df7667 --- /dev/null +++ b/.changeset/rich-drinks-punch.md @@ -0,0 +1,6 @@ +--- +"@medusajs/core-flows": patch +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-sdk): Allow when then in parallelize diff --git a/packages/core/core-flows/src/product/workflows/batch-products.ts b/packages/core/core-flows/src/product/workflows/batch-products.ts index 9354c154ed..baf1d1bf4b 100644 --- a/packages/core/core-flows/src/product/workflows/batch-products.ts +++ b/packages/core/core-flows/src/product/workflows/batch-products.ts @@ -19,19 +19,20 @@ import { updateProductsWorkflow } from "./update-products" /** * The products to manage. */ -export interface BatchProductWorkflowInput extends BatchWorkflowInput< - CreateProductWorkflowInputDTO, - UpdateProductWorkflowInputDTO -> {} +export interface BatchProductWorkflowInput + extends BatchWorkflowInput< + CreateProductWorkflowInputDTO, + UpdateProductWorkflowInputDTO + > {} export const batchProductsWorkflowId = "batch-products" /** * This workflow creates, updates, or deletes products. It's used by the * [Manage Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsbatch). - * + * * You can use this workflow within your own customizations or custom workflows to manage products in bulk. This is * also useful when writing a [seed script](https://docs.medusajs.com/learn/fundamentals/custom-cli-scripts/seed-data) or a custom import script. - * + * * @example * const { result } = await batchProductsWorkflow(container) * .run({ @@ -68,11 +69,11 @@ export const batchProductsWorkflowId = "batch-products" * } * ], * delete: ["prod_321"] - * } + * } * }) - * + * * @summary - * + * * Manage products in bulk. */ export const batchProductsWorkflow = createWorkflow( diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts index 7e46d2d164..09f20b6dc2 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts @@ -19,10 +19,12 @@ import { StepFunction, StepResponse, transform, + when, WorkflowResponse, } from ".." import { MedusaWorkflow } from "../../../medusa-workflow" import { createHook } from "../create-hook" +import { setTimeout } from "timers/promises" jest.setTimeout(30000) @@ -742,6 +744,71 @@ describe("Workflow composer", function () { }) }) + it("should compose a new workflow with conditional parallelized steps", async () => { + const stepResults: string[] = [] + + const mockStep1Fn = jest.fn().mockImplementation(async () => { + await setTimeout(100) + stepResults.push("step1") + return new StepResponse(true) + }) as any + const mockStep2Fn = jest.fn().mockImplementation(() => { + stepResults.push("step2") + return new StepResponse(true) + }) as any + const mockStep3Fn = jest.fn().mockImplementation(() => { + stepResults.push("step3") + return new StepResponse(true) + }) as any + const mockStep4Fn = jest.fn().mockImplementation(() => { + stepResults.push("step4") + return new StepResponse(true) + }) as any + + const step1 = createStep("step1", mockStep1Fn) + const step2 = createStep("step2", mockStep2Fn) + const step3 = createStep("step3", mockStep3Fn) + const step4 = createStep("step4", mockStep4Fn) + + const callStep2IfNeeded = () => { + return when({}, () => false).then(() => { + return step2() + }) + } + + const callStep3IfNeeded = () => { + return when({}, () => false).then(() => { + return step4() + }) + } + + const workflow = createWorkflow("workflow1", function (input) { + const [ret1, ret2, ret3, ret4] = parallelize( + step1(), + callStep2IfNeeded(), + step3(), + callStep3IfNeeded() + ) + return new WorkflowResponse({ ret1, ret2, ret3, ret4 }) + }) + + const { result: workflowResult } = await workflow().run() + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep2Fn).toHaveBeenCalledTimes(0) + expect(mockStep3Fn).toHaveBeenCalledTimes(1) + expect(mockStep4Fn).toHaveBeenCalledTimes(0) + + expect(workflowResult).toEqual({ + ret1: true, + ret2: undefined, + ret3: true, + ret4: undefined, + }) + + expect(stepResults).toEqual(["step3", "step1"]) + }) + it("should compose a new workflow with parallelize steps and rollback them all in case of error", async () => { const step1CompensationFn = jest.fn().mockImplementation(() => { return "step1 compensation" diff --git a/packages/core/workflows-sdk/src/utils/composer/parallelize.ts b/packages/core/workflows-sdk/src/utils/composer/parallelize.ts index 3ddf4e20b3..581f324075 100644 --- a/packages/core/workflows-sdk/src/utils/composer/parallelize.ts +++ b/packages/core/workflows-sdk/src/utils/composer/parallelize.ts @@ -19,21 +19,21 @@ import { OrchestrationUtils } from "@medusajs/utils" * createPricesStep, * attachProductToSalesChannelStep * } from "./steps" - * + * * interface WorkflowInput { * title: string * } - * + * * const myWorkflow = createWorkflow( * "my-workflow", * (input: WorkflowInput) => { * const product = createProductStep(input) - * + * * const [prices, productSalesChannel] = parallelize( * createPricesStep(product), * attachProductToSalesChannelStep(product) * ) - * + * * return new WorkflowResponse({ * prices, * productSalesChannel @@ -41,7 +41,7 @@ import { OrchestrationUtils } from "@medusajs/utils" * } * ) */ -export function parallelize( +export function parallelize( ...steps: TResult ): TResult { if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) { @@ -64,7 +64,7 @@ export function parallelize( const stepOntoMerge = steps.shift()! this.flow.mergeActions( stepOntoMerge.__step__, - ...steps.map((step) => step.__step__) + ...steps.map((step) => step!.__step__) ) return resultSteps as unknown as TResult diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index 5a816c14b1..6e2027de09 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -104,7 +104,9 @@ export type CreateWorkflowComposerContext = { fn: StepFunctionResult ) => WorkflowData hookBinder: (name: string, fn: () => HookHandler) => void - parallelizeBinder: ( + parallelizeBinder: < + TOutput extends (WorkflowData | undefined)[] = WorkflowData[] + >( fn: (this: CreateWorkflowComposerContext) => TOutput ) => TOutput }