feat(workflows-sdk): Allow when then in parallelize (#11756)

**What**
Update typings to allow using when then inside parallelize
This commit is contained in:
Adrien de Peretti
2025-03-06 14:23:37 +01:00
committed by GitHub
parent 3b470f4142
commit 84f991192e
5 changed files with 92 additions and 16 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/core-flows": patch
"@medusajs/workflows-sdk": patch
---
feat(workflows-sdk): Allow when then in parallelize

View File

@@ -19,19 +19,20 @@ import { updateProductsWorkflow } from "./update-products"
/**
* The products to manage.
*/
export interface BatchProductWorkflowInput extends BatchWorkflowInput<
CreateProductWorkflowInputDTO,
UpdateProductWorkflowInputDTO
> {}
export interface BatchProductWorkflowInput
extends BatchWorkflowInput<
CreateProductWorkflowInputDTO,
UpdateProductWorkflowInputDTO
> {}
export const batchProductsWorkflowId = "batch-products"
/**
* This workflow creates, updates, or deletes products. It's used by the
* [Manage Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsbatch).
*
*
* You can use this workflow within your own customizations or custom workflows to manage products in bulk. This is
* also useful when writing a [seed script](https://docs.medusajs.com/learn/fundamentals/custom-cli-scripts/seed-data) or a custom import script.
*
*
* @example
* const { result } = await batchProductsWorkflow(container)
* .run({
@@ -68,11 +69,11 @@ export const batchProductsWorkflowId = "batch-products"
* }
* ],
* delete: ["prod_321"]
* }
* }
* })
*
*
* @summary
*
*
* Manage products in bulk.
*/
export const batchProductsWorkflow = createWorkflow(

View File

@@ -19,10 +19,12 @@ import {
StepFunction,
StepResponse,
transform,
when,
WorkflowResponse,
} from ".."
import { MedusaWorkflow } from "../../../medusa-workflow"
import { createHook } from "../create-hook"
import { setTimeout } from "timers/promises"
jest.setTimeout(30000)
@@ -742,6 +744,71 @@ describe("Workflow composer", function () {
})
})
it("should compose a new workflow with conditional parallelized steps", async () => {
const stepResults: string[] = []
const mockStep1Fn = jest.fn().mockImplementation(async () => {
await setTimeout(100)
stepResults.push("step1")
return new StepResponse(true)
}) as any
const mockStep2Fn = jest.fn().mockImplementation(() => {
stepResults.push("step2")
return new StepResponse(true)
}) as any
const mockStep3Fn = jest.fn().mockImplementation(() => {
stepResults.push("step3")
return new StepResponse(true)
}) as any
const mockStep4Fn = jest.fn().mockImplementation(() => {
stepResults.push("step4")
return new StepResponse(true)
}) as any
const step1 = createStep("step1", mockStep1Fn)
const step2 = createStep("step2", mockStep2Fn)
const step3 = createStep("step3", mockStep3Fn)
const step4 = createStep("step4", mockStep4Fn)
const callStep2IfNeeded = () => {
return when({}, () => false).then(() => {
return step2()
})
}
const callStep3IfNeeded = () => {
return when({}, () => false).then(() => {
return step4()
})
}
const workflow = createWorkflow("workflow1", function (input) {
const [ret1, ret2, ret3, ret4] = parallelize(
step1(),
callStep2IfNeeded(),
step3(),
callStep3IfNeeded()
)
return new WorkflowResponse({ ret1, ret2, ret3, ret4 })
})
const { result: workflowResult } = await workflow().run()
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
expect(mockStep2Fn).toHaveBeenCalledTimes(0)
expect(mockStep3Fn).toHaveBeenCalledTimes(1)
expect(mockStep4Fn).toHaveBeenCalledTimes(0)
expect(workflowResult).toEqual({
ret1: true,
ret2: undefined,
ret3: true,
ret4: undefined,
})
expect(stepResults).toEqual(["step3", "step1"])
})
it("should compose a new workflow with parallelize steps and rollback them all in case of error", async () => {
const step1CompensationFn = jest.fn().mockImplementation(() => {
return "step1 compensation"

View File

@@ -19,21 +19,21 @@ import { OrchestrationUtils } from "@medusajs/utils"
* createPricesStep,
* attachProductToSalesChannelStep
* } from "./steps"
*
*
* interface WorkflowInput {
* title: string
* }
*
*
* const myWorkflow = createWorkflow(
* "my-workflow",
* (input: WorkflowInput) => {
* const product = createProductStep(input)
*
*
* const [prices, productSalesChannel] = parallelize(
* createPricesStep(product),
* attachProductToSalesChannelStep(product)
* )
*
*
* return new WorkflowResponse({
* prices,
* productSalesChannel
@@ -41,7 +41,7 @@ import { OrchestrationUtils } from "@medusajs/utils"
* }
* )
*/
export function parallelize<TResult extends WorkflowData[]>(
export function parallelize<TResult extends (WorkflowData | undefined)[]>(
...steps: TResult
): TResult {
if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) {
@@ -64,7 +64,7 @@ export function parallelize<TResult extends WorkflowData[]>(
const stepOntoMerge = steps.shift()!
this.flow.mergeActions(
stepOntoMerge.__step__,
...steps.map((step) => step.__step__)
...steps.map((step) => step!.__step__)
)
return resultSteps as unknown as TResult

View File

@@ -104,7 +104,9 @@ export type CreateWorkflowComposerContext = {
fn: StepFunctionResult
) => WorkflowData<TOutput>
hookBinder: (name: string, fn: () => HookHandler) => void
parallelizeBinder: <TOutput extends WorkflowData[] = WorkflowData[]>(
parallelizeBinder: <
TOutput extends (WorkflowData | undefined)[] = WorkflowData[]
>(
fn: (this: CreateWorkflowComposerContext) => TOutput
) => TOutput
}