From ef8dc4087ece0181ddbc6b3ad7fb96e463c477ff Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Mon, 16 Sep 2024 10:06:45 -0300 Subject: [PATCH] feat: run nested async workflows (#9119) --- .../__tests__/workflow-engine/tests.ts | 2 + .../workflows/create-product-variants.ts | 6 +- .../transaction/transaction-orchestrator.ts | 11 ++- .../orchestration/src/transaction/types.ts | 6 ++ packages/core/types/src/shared-context.ts | 5 ++ .../core/types/src/workflows-sdk/service.ts | 3 + .../src/helper/workflow-export.ts | 4 +- .../core/workflows-sdk/src/medusa-workflow.ts | 4 +- .../utils/composer/__tests__/index.spec.ts | 10 ++- .../src/utils/composer/create-step.ts | 19 ++--- .../src/utils/composer/create-workflow.ts | 29 ++++++-- .../composer/helpers/create-step-handler.ts | 16 +++-- .../workflows-sdk/src/utils/composer/type.ts | 6 ++ packages/modules/event-bus-redis/package.json | 2 +- .../__fixtures__/workflow_async.ts | 28 +++++++- .../workflow-engine-inmemory/package.json | 2 +- .../src/services/workflow-orchestrator.ts | 45 +++++++++++- .../__fixtures__/workflow_async.ts | 40 ++++++++++- .../integration-tests/__tests__/index.spec.ts | 51 ++++++------- .../workflow-engine-redis/package.json | 7 +- .../src/services/workflow-orchestrator.ts | 72 ++++++++++++++----- .../utils/workflow-orchestrator-storage.ts | 14 ++-- yarn.lock | 13 ++-- 23 files changed, 295 insertions(+), 100 deletions(-) diff --git a/integration-tests/modules/__tests__/workflow-engine/tests.ts b/integration-tests/modules/__tests__/workflow-engine/tests.ts index 4a402d0cbd..a5eae8ec4d 100644 --- a/integration-tests/modules/__tests__/workflow-engine/tests.ts +++ b/integration-tests/modules/__tests__/workflow-engine/tests.ts @@ -165,6 +165,8 @@ export const workflowEngineTestSuite = ( acknowledgement: { transactionId: "trx_123", workflowId: "my-workflow-name", + hasFailed: false, + hasFinished: false, }, }) diff --git a/packages/core/core-flows/src/product/workflows/create-product-variants.ts b/packages/core/core-flows/src/product/workflows/create-product-variants.ts index 10eb3f9004..7edd20553f 100644 --- a/packages/core/core-flows/src/product/workflows/create-product-variants.ts +++ b/packages/core/core-flows/src/product/workflows/create-product-variants.ts @@ -11,11 +11,11 @@ import { ProductVariantWorkflowEvents, } from "@medusajs/utils" import { + WorkflowData, + WorkflowResponse, createHook, createWorkflow, transform, - WorkflowData, - WorkflowResponse, } from "@medusajs/workflows-sdk" import { emitEventStep } from "../../common" import { createLinksWorkflow } from "../../common/workflows/create-links" @@ -97,7 +97,7 @@ const buildLinksToCreate = (data: { const linksToCreate: LinkDefinition[] = [] validateVariantsDuplicateInventoryItemIds( - data.createdVariants.map((variant, index) => { + (data.createdVariants ?? []).map((variant, index) => { const variantInput = data.input.product_variants[index] const inventoryItems = variantInput.inventory_items || [] diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 04130f239e..82b26281be 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -795,7 +795,10 @@ export class TransactionOrchestrator extends EventEmitter { this ) .then(async (response: any) => { - if (!step.definition.backgroundExecution) { + if ( + !step.definition.backgroundExecution || + step.definition.nested + ) { const eventName = DistributedTransactionEvent.STEP_AWAITING transaction.emit(eventName, { step, transaction }) @@ -825,6 +828,7 @@ export class TransactionOrchestrator extends EventEmitter { ) } + // check nested flow await transaction.scheduleRetry( step, step.definition.retryInterval ?? 0 @@ -1033,6 +1037,7 @@ export class TransactionOrchestrator extends EventEmitter { hasAsyncSteps: false, hasStepTimeouts: false, hasRetriesTimeout: false, + hasNestedTransactions: false, } while (queue.length > 0) { @@ -1073,6 +1078,10 @@ export class TransactionOrchestrator extends EventEmitter { features.hasRetriesTimeout = true } + if (definitionCopy.nested) { + features.hasNestedTransactions = true + } + states[id] = Object.assign( new TransactionStep(), existingSteps?.[id] || { diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index f6915f8793..ef48652858 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -65,6 +65,11 @@ export type TransactionStepsDefinition = { */ async?: boolean + /** + * It flags where the step contains a sub transaction inside itself. + */ + nested?: boolean + /** * It applies to "async" steps only, allowing them to run in the background and automatically complete without external intervention. * It is ideal for time-consuming tasks that will be complete after the execution, contrasting with standard "async" operations that require a response to be set in a later stage. @@ -237,6 +242,7 @@ export type TransactionFlow = { transactionId: string metadata?: { eventGroupId?: string + parentIdempotencyKey?: string [key: string]: unknown } hasAsyncSteps: boolean diff --git a/packages/core/types/src/shared-context.ts b/packages/core/types/src/shared-context.ts index d4000b4b79..75879267f4 100644 --- a/packages/core/types/src/shared-context.ts +++ b/packages/core/types/src/shared-context.ts @@ -85,4 +85,9 @@ export type Context = { * A string indicating the idempotencyKey of the current workflow execution. */ idempotencyKey?: string + + /** + * A string indicating the idempotencyKey of the parent workflow execution. + */ + parentStepIdempotencyKey?: string } diff --git a/packages/core/types/src/workflows-sdk/service.ts b/packages/core/types/src/workflows-sdk/service.ts index e7f4788997..59e719c3ba 100644 --- a/packages/core/types/src/workflows-sdk/service.ts +++ b/packages/core/types/src/workflows-sdk/service.ts @@ -18,6 +18,9 @@ type FlowRunOptions = { export type Acknowledgement = { workflowId: string transactionId: string + parentStepIdempotencyKey?: string + hasFinished: boolean + hasFailed: boolean } export interface WorkflowOrchestratorRunDTO diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index c51307f519..f5350ff202 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -74,6 +74,7 @@ function createContextualWorkflowRunner< ) => { if (!executionContainer) { const container_ = flow.container as MedusaContainer + if (!container_ || !isPresent(container_?.registrations)) { executionContainer = MedusaModule.getLoadedModules().map( (mod) => Object.values(mod)[0] @@ -85,12 +86,13 @@ function createContextualWorkflowRunner< flow.container = executionContainer } - const { eventGroupId } = context + const { eventGroupId, parentStepIdempotencyKey } = context attachOnFinishReleaseEvents(events, eventGroupId!, flow, { logOnError }) const flowMetadata = { eventGroupId, + parentStepIdempotencyKey, } const args = [ diff --git a/packages/core/workflows-sdk/src/medusa-workflow.ts b/packages/core/workflows-sdk/src/medusa-workflow.ts index 569335c70c..04602d0b13 100644 --- a/packages/core/workflows-sdk/src/medusa-workflow.ts +++ b/packages/core/workflows-sdk/src/medusa-workflow.ts @@ -22,8 +22,8 @@ class MedusaWorkflow { MedusaWorkflow.workflows[workflowId] = exportedWorkflow } - static getWorkflow(workflowId) { - return MedusaWorkflow.workflows[workflowId] + static getWorkflow(workflowId): ExportedWorkflow { + return MedusaWorkflow.workflows[workflowId] as unknown as ExportedWorkflow } } diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts index b537f6a70e..64d06ad239 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts @@ -240,8 +240,10 @@ describe("Workflow composer", () => { expect(result).toEqual({ result: "hi from outside" }) - expect(parentContext.transactionId).toEqual("transactionId") - expect(parentContext.transactionId).toEqual(childContext.transactionId) + expect(parentContext.transactionId).toEqual(expect.any(String)) + expect(parentContext.transactionId).not.toEqual( + childContext.transactionId + ) expect(parentContext.eventGroupId).toEqual("eventGroupId") expect(parentContext.eventGroupId).toEqual(childContext.eventGroupId) @@ -287,7 +289,9 @@ describe("Workflow composer", () => { expect(result).toEqual({ result: "hi from outside" }) expect(parentContext.transactionId).toBeTruthy() - expect(parentContext.transactionId).toEqual(childContext.transactionId) + expect(parentContext.transactionId).not.toEqual( + childContext.transactionId + ) expect(parentContext.eventGroupId).toBeTruthy() expect(parentContext.eventGroupId).toEqual(childContext.eventGroupId) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index d91d408413..2e6955b7c2 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -7,6 +7,7 @@ import { import { OrchestrationUtils, isString } from "@medusajs/utils" import { ulid } from "ulid" import { StepResponse, resolveValue } from "./helpers" +import { createStepHandler } from "./helpers/create-step-handler" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -15,7 +16,6 @@ import { StepFunctionResult, WorkflowData, } from "./type" -import { createStepHandler } from "./helpers/create-step-handler" /** * The type of invocation function passed to a step. @@ -65,6 +65,11 @@ export type CompensateFn = ( context: StepExecutionContext ) => unknown | Promise +export type LocalStepConfig = { name?: string } & Omit< + TransactionStepsDefinition, + "next" | "uuid" | "action" +> + export interface ApplyStepOptions< TStepInputs extends { [K in keyof TInvokeInput]: WorkflowData @@ -136,6 +141,8 @@ export function applyStep< this.flow.addAction(stepName, stepConfig) + this.isAsync ||= !!(stepConfig.async || stepConfig.compensateAsync) + if (!this.handlers.has(stepName)) { this.handlers.set(stepName, handler) } @@ -143,12 +150,7 @@ export function applyStep< const ret = { __type: OrchestrationUtils.SymbolWorkflowStep, __step__: stepName, - config: ( - localConfig: { name?: string } & Omit< - TransactionStepsDefinition, - "next" | "uuid" | "action" - > - ) => { + config: (localConfig: LocalStepConfig) => { const newStepName = localConfig.name ?? stepName const newConfig = { ...stepConfig, @@ -160,6 +162,7 @@ export function applyStep< this.handlers.set(newStepName, handler) this.flow.replaceAction(stepConfig.uuid!, newStepName, newConfig) + this.isAsync ||= !!(newConfig.async || newConfig.compensateAsync) ret.__step__ = newStepName WorkflowManager.update(this.workflowId, this.flow, this.handlers) @@ -178,7 +181,7 @@ export function applyStep< flagSteps.push(confRef) } - return confRef + return confRef as StepFunction }, if: ( input: any, diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index ecb218e056..8e11efa2ac 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -5,6 +5,7 @@ import { } from "@medusajs/orchestration" import { LoadedModule, MedusaContainer } from "@medusajs/types" import { OrchestrationUtils, isString } from "@medusajs/utils" +import { ulid } from "ulid" import { exportWorkflow } from "../../helper" import { createStep } from "./create-step" import { proxify } from "./helpers/proxy" @@ -104,6 +105,7 @@ export function createWorkflow( __type: OrchestrationUtils.SymbolMedusaWorkflowComposerContext, workflowId: name, flow: WorkflowManager.getEmptyTransactionDefinition(), + isAsync: false, handlers, hooks_: { declared: [], @@ -176,21 +178,32 @@ export function createWorkflow( }: { input: TData }): ReturnType> => { - // TODO: Async sub workflow is not supported yet - // Info: Once the export workflow can fire the execution through the engine if loaded, the async workflow can be executed, - // the step would inherit the async configuration and subscribe to the onFinish event of the sub worklow and mark itself as success or failure - return createStep( - `${name}-as-step`, + const step = createStep( + { + name: `${name}-as-step`, + async: context.isAsync, + nested: context.isAsync, // if async we flag this is a nested transaction + }, async (stepInput: TData, stepContext) => { const { container, ...sharedContext } = stepContext const transaction = await workflow.run({ input: stepInput as any, container, - context: sharedContext, + context: { + ...sharedContext, + parentStepIdempotencyKey: stepContext.idempotencyKey, + transactionId: ulid(), + }, }) - return new StepResponse(transaction.result, transaction) + const { result, transaction: flowTransaction } = transaction + + if (!context.isAsync || flowTransaction.hasFinished()) { + return new StepResponse(result, transaction) + } + + return }, async (transaction, { container }) => { if (!transaction) { @@ -200,6 +213,8 @@ export function createWorkflow( await workflow(container).cancel(transaction) } )(input) as ReturnType> + + return step } return mainFlow as ReturnWorkflow diff --git a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts index 5f9084b33f..06020f502f 100644 --- a/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts +++ b/packages/core/workflows-sdk/src/utils/composer/helpers/create-step-handler.ts @@ -1,13 +1,13 @@ +import { WorkflowStepHandlerArguments } from "@medusajs/orchestration" +import { OrchestrationUtils, deepCopy } from "@medusajs/utils" +import { ApplyStepOptions } from "../create-step" import { CreateWorkflowComposerContext, StepExecutionContext, WorkflowData, } from "../type" -import { WorkflowStepHandlerArguments } from "@medusajs/orchestration" import { resolveValue } from "./resolve-value" import { StepResponse } from "./step-response" -import { deepCopy, OrchestrationUtils } from "@medusajs/utils" -import { ApplyStepOptions } from "../create-step" export function createStepHandler< TInvokeInput, @@ -36,6 +36,8 @@ export function createStepHandler< const idempotencyKey = metadata.idempotency_key stepArguments.context!.idempotencyKey = idempotencyKey + + const flowMetadata = stepArguments.transaction.getFlow()?.metadata const executionContext: StepExecutionContext = { workflowId: metadata.model_id, stepName: metadata.action, @@ -45,8 +47,9 @@ export function createStepHandler< container: stepArguments.container, metadata, eventGroupId: - stepArguments.transaction.getFlow()?.metadata?.eventGroupId ?? - stepArguments.context!.eventGroupId, + flowMetadata?.eventGroupId ?? stepArguments.context!.eventGroupId, + parentStepIdempotencyKey: + flowMetadata?.parentStepIdempotencyKey as string, transactionId: stepArguments.context!.transactionId, context: stepArguments.context!, } @@ -74,11 +77,14 @@ export function createStepHandler< stepArguments.context!.idempotencyKey = idempotencyKey + const flowMetadata = stepArguments.transaction.getFlow()?.metadata const executionContext: StepExecutionContext = { workflowId: metadata.model_id, stepName: metadata.action, action: "compensate", idempotencyKey, + parentStepIdempotencyKey: + flowMetadata?.parentStepIdempotencyKey as string, attempt: metadata.attempt, container: stepArguments.container, metadata, diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index e291fe712e..b10b8b32ec 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -98,6 +98,7 @@ export type CreateWorkflowComposerContext = { hooksCallback_: Record workflowId: string flow: OrchestratorBuilder + isAsync: boolean handlers: WorkflowHandler stepBinder: ( fn: StepFunctionResult @@ -127,6 +128,11 @@ export interface StepExecutionContext { */ idempotencyKey: string + /** + * The idempoency key of the parent step. + */ + parentStepIdempotencyKey?: string + /** * The name of the step. */ diff --git a/packages/modules/event-bus-redis/package.json b/packages/modules/event-bus-redis/package.json index 680f35f164..5c7056b4bf 100644 --- a/packages/modules/event-bus-redis/package.json +++ b/packages/modules/event-bus-redis/package.json @@ -35,7 +35,7 @@ "dependencies": { "@medusajs/modules-sdk": "^1.12.11", "@medusajs/utils": "^1.11.9", - "bullmq": "5.12.0", + "bullmq": "5.13.0", "ioredis": "^5.4.1" }, "peerDependencies": { diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts index b2c627bcd1..69a01034d9 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts @@ -1,7 +1,9 @@ import { StepResponse, + WorkflowResponse, createStep, createWorkflow, + parallelize, } from "@medusajs/workflows-sdk" import { setTimeout } from "timers/promises" @@ -17,9 +19,9 @@ const step_1_background = createStep( }) ) -createWorkflow( +const nestedWorkflow = createWorkflow( { - name: "workflow_async_background", + name: "nested_sub_flow_async", }, function (input) { const resp = step_1_background(input) @@ -27,3 +29,25 @@ createWorkflow( return resp } ) + +createWorkflow( + { + name: "workflow_async_background", + }, + function (input) { + const [ret] = parallelize( + nestedWorkflow + .runAsStep({ + input, + }) + .config({ name: "step_sub_flow_1" }), + nestedWorkflow + .runAsStep({ + input, + }) + .config({ name: "step_sub_flow_2" }) + ) + + return new WorkflowResponse(ret) + } +) diff --git a/packages/modules/workflow-engine-inmemory/package.json b/packages/modules/workflow-engine-inmemory/package.json index cb60ad9477..891ba83bd4 100644 --- a/packages/modules/workflow-engine-inmemory/package.json +++ b/packages/modules/workflow-engine-inmemory/package.json @@ -25,7 +25,7 @@ "watch:test": "tsc --build tsconfig.spec.json --watch", "build": "rimraf dist && tsc --build && tsc-alias -p tsconfig.json", "test": "jest --passWithNoTests --runInBand --bail --forceExit -- src", - "test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts", + "test:integration": "jest --silent --forceExit -- integration-tests/**/__tests__/**/*.ts", "migration:generate": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts mikro-orm migration:generate", "migration:initial": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts mikro-orm migration:create --initial", "migration:create": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts mikro-orm migration:create", diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index b0d23c0050..ccb849b242 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -11,6 +11,7 @@ import { InjectSharedContext, MedusaContext, MedusaError, + TransactionState, isString, } from "@medusajs/utils" import { @@ -97,6 +98,29 @@ export class WorkflowOrchestratorService { WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage) } + private async triggerParentStep(transaction, result) { + const metadata = transaction.flow.metadata + const { parentStepIdempotencyKey } = metadata ?? {} + if (parentStepIdempotencyKey) { + const hasFailed = [ + TransactionState.REVERTED, + TransactionState.FAILED, + ].includes(transaction.flow.state) + + if (hasFailed) { + await this.setStepFailure({ + idempotencyKey: parentStepIdempotencyKey, + stepResponse: result, + }) + } else { + await this.setStepSuccess({ + idempotencyKey: parentStepIdempotencyKey, + stepResponse: result, + }) + } + } + } + @InjectSharedContext() async run( workflowIdOrWorkflow: string | ReturnWorkflow, @@ -155,14 +179,25 @@ export class WorkflowOrchestratorService { events, }) - // TODO: temporary + const hasFinished = ret.transaction.hasFinished() + const metadata = ret.transaction.getFlow().metadata + const { parentStepIdempotencyKey } = metadata ?? {} + const hasFailed = [ + TransactionState.REVERTED, + TransactionState.FAILED, + ].includes(ret.transaction.getFlow().state) + const acknowledgement = { transactionId: context.transactionId, workflowId: workflowId, + parentStepIdempotencyKey, + hasFinished, + hasFailed, } if (ret.transaction.hasFinished()) { const { result, errors } = ret + this.notify({ eventType: "onFinish", workflowId, @@ -170,6 +205,8 @@ export class WorkflowOrchestratorService { result, errors, }) + + await this.triggerParentStep(ret.transaction, result) } return { acknowledgement, ...ret } @@ -261,6 +298,7 @@ export class WorkflowOrchestratorService { if (ret.transaction.hasFinished()) { const { result, errors } = ret + this.notify({ eventType: "onFinish", workflowId, @@ -268,6 +306,8 @@ export class WorkflowOrchestratorService { result, errors, }) + + await this.triggerParentStep(ret.transaction, result) } return ret @@ -325,6 +365,7 @@ export class WorkflowOrchestratorService { if (ret.transaction.hasFinished()) { const { result, errors } = ret + this.notify({ eventType: "onFinish", workflowId, @@ -332,6 +373,8 @@ export class WorkflowOrchestratorService { result, errors, }) + + await this.triggerParentStep(ret.transaction, result) } return ret diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts index b2c627bcd1..5a6250a8c1 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts @@ -1,7 +1,9 @@ import { StepResponse, + WorkflowResponse, createStep, createWorkflow, + parallelize, } from "@medusajs/workflows-sdk" import { setTimeout } from "timers/promises" @@ -11,15 +13,15 @@ const step_1_background = createStep( async: true, }, jest.fn(async (input) => { - await setTimeout(200) + await setTimeout(Math.random() * 300) return new StepResponse(input) }) ) -createWorkflow( +const nestedWorkflow = createWorkflow( { - name: "workflow_async_background", + name: "nested_sub_flow_async", }, function (input) { const resp = step_1_background(input) @@ -27,3 +29,35 @@ createWorkflow( return resp } ) + +createWorkflow( + { + name: "workflow_async_background", + }, + function (input) { + const [ret] = parallelize( + nestedWorkflow + .runAsStep({ + input, + }) + .config({ name: "step_sub_flow_1" }), + nestedWorkflow + .runAsStep({ + input, + }) + .config({ name: "step_sub_flow_2" }), + nestedWorkflow + .runAsStep({ + input, + }) + .config({ name: "step_sub_flow_3" }), + nestedWorkflow + .runAsStep({ + input, + }) + .config({ name: "step_sub_flow_4" }) + ) + + return new WorkflowResponse(ret) + } +) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 7e3c3a439d..6de6875de0 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -23,7 +23,7 @@ import "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { TestDatabase } from "../utils" -jest.setTimeout(100000) +jest.setTimeout(999900000) moduleIntegrationTestRunner({ moduleName: Modules.WORKFLOW_ENGINE, @@ -35,10 +35,8 @@ moduleIntegrationTestRunner({ }, testSuite: ({ service: workflowOrcModule, medusaApp }) => { describe("Workflow Orchestrator module", function () { - const afterEach_ = async () => { + beforeEach(async () => { await TestDatabase.clearTables() - } - beforeEach(() => { jest.clearAllMocks() }) @@ -93,8 +91,6 @@ moduleIntegrationTestRunner({ }) describe("Testing basic workflow", function () { - afterEach(afterEach_) - it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => { await workflowOrcModule.run("workflow_1", { input: { @@ -271,34 +267,31 @@ moduleIntegrationTestRunner({ ).toBe(true) }) - it("should complete an async workflow that returns a StepResponse", async () => { - const { transaction, result } = await workflowOrcModule.run( - "workflow_async_background", - { + it("should complete an async workflow that returns a StepResponse", (done) => { + const transactionId = "transaction_1" + void workflowOrcModule + .run("workflow_async_background", { input: { myInput: "123", }, - transactionId: "transaction_1", - throwOnError: false, - } - ) + transactionId, + throwOnError: true, + }) + .then(({ transaction, result }) => { + expect(transaction.flow.state).toEqual( + TransactionStepState.INVOKING + ) + expect(result).toEqual(undefined) + }) - expect(transaction.flow.state).toEqual(TransactionStepState.INVOKING) - expect(result).toEqual(undefined) - - await setTimeout(205) - - const trx = await workflowOrcModule.run("workflow_async_background", { - input: { - myInput: "123", + void workflowOrcModule.subscribe({ + workflowId: "workflow_async_background", + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + done() + } }, - transactionId: "transaction_1", - throwOnError: false, - }) - - expect(trx.transaction.flow.state).toEqual(TransactionStepState.DONE) - expect(trx.result).toEqual({ - myInput: "123", }) }) diff --git a/packages/modules/workflow-engine-redis/package.json b/packages/modules/workflow-engine-redis/package.json index e56f551234..9d199a5727 100644 --- a/packages/modules/workflow-engine-redis/package.json +++ b/packages/modules/workflow-engine-redis/package.json @@ -25,7 +25,7 @@ "watch:test": "tsc --build tsconfig.spec.json --watch", "build": "rimraf dist && tsc --build && tsc-alias -p tsconfig.json", "test": "jest --passWithNoTests --runInBand --bail --forceExit -- src", - "test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts", + "test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts", "migration:generate": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts mikro-orm migration:generate", "migration:initial": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts mikro-orm migration:create --initial", "migration:create": " MIKRO_ORM_CLI=./mikro-orm.config.dev.ts mikro-orm migration:create", @@ -47,13 +47,14 @@ "@medusajs/orchestration": "^0.5.7", "@medusajs/utils": "^1.11.9", "@medusajs/workflows-sdk": "^0.1.6", - "bullmq": "5.12.0", + "bullmq": "5.13.0", "ioredis": "^5.4.1" }, "peerDependencies": { "@mikro-orm/core": "5.9.7", "@mikro-orm/migrations": "5.9.7", "@mikro-orm/postgresql": "5.9.7", - "awilix": "^8.0.1" + "awilix": "^8.0.1", + "ulid": "^2.3.0" } } diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index d2c9d67468..30d5c7e713 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -12,7 +12,12 @@ import { Logger, MedusaContainer, } from "@medusajs/types" -import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" +import { + InjectSharedContext, + MedusaContext, + TransactionState, + isString, +} from "@medusajs/utils" import { FlowRunOptions, MedusaWorkflow, @@ -146,6 +151,29 @@ export class WorkflowOrchestratorService { await this.redisDistributedTransactionStorage_.onApplicationStart() } + private async triggerParentStep(transaction, result) { + const metadata = transaction.flow.metadata + const { parentStepIdempotencyKey } = metadata ?? {} + if (parentStepIdempotencyKey) { + const hasFailed = [ + TransactionState.REVERTED, + TransactionState.FAILED, + ].includes(transaction.flow.state) + + if (hasFailed) { + await this.setStepFailure({ + idempotencyKey: parentStepIdempotencyKey, + stepResponse: result, + }) + } else { + await this.setStepSuccess({ + idempotencyKey: parentStepIdempotencyKey, + stepResponse: result, + }) + } + } + } + @InjectSharedContext() async run( workflowIdOrWorkflow: string | ReturnWorkflow, @@ -180,16 +208,12 @@ export class WorkflowOrchestratorService { transactionId: context.transactionId, }) - const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId) + const exportedWorkflow = MedusaWorkflow.getWorkflow(workflowId) if (!exportedWorkflow) { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow( - (container as MedusaContainer) ?? this.container_ - ) - - const ret = await flow.run({ + const ret = await exportedWorkflow.run({ input, throwOnError, logOnError, @@ -198,14 +222,25 @@ export class WorkflowOrchestratorService { events, }) - // TODO: temporary + const hasFinished = ret.transaction.hasFinished() + const metadata = ret.transaction.getFlow().metadata + const { parentStepIdempotencyKey } = metadata ?? {} + const hasFailed = [ + TransactionState.REVERTED, + TransactionState.FAILED, + ].includes(ret.transaction.getFlow().state) + const acknowledgement = { transactionId: context.transactionId, workflowId: workflowId, + parentStepIdempotencyKey, + hasFinished, + hasFailed, } if (ret.transaction.hasFinished()) { const { result, errors } = ret + await this.notify({ eventType: "onFinish", workflowId, @@ -213,6 +248,8 @@ export class WorkflowOrchestratorService { result, errors, }) + + await this.triggerParentStep(ret.transaction, result) } return { acknowledgement, ...ret } @@ -243,12 +280,11 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow( - (container as MedusaContainer) ?? this.container_ + const transaction = await exportedWorkflow.getRunningTransaction( + transactionId, + context ) - const transaction = await flow.getRunningTransaction(transactionId, context) - return transaction } @@ -282,17 +318,13 @@ export class WorkflowOrchestratorService { throw new Error(`Workflow with id "${workflowId}" not found.`) } - const flow = exportedWorkflow( - (container as MedusaContainer) ?? this.container_ - ) - const events = this.buildWorkflowEvents({ customEventHandlers: eventHandlers, transactionId, workflowId, }) - const ret = await flow.registerStepSuccess({ + const ret = await exportedWorkflow.registerStepSuccess({ idempotencyKey: idempotencyKey_, context, resultFrom, @@ -304,6 +336,7 @@ export class WorkflowOrchestratorService { if (ret.transaction.hasFinished()) { const { result, errors } = ret + await this.notify({ eventType: "onFinish", workflowId, @@ -311,6 +344,8 @@ export class WorkflowOrchestratorService { result, errors, }) + + await this.triggerParentStep(ret.transaction, result) } return ret @@ -368,6 +403,7 @@ export class WorkflowOrchestratorService { if (ret.transaction.hasFinished()) { const { result, errors } = ret + await this.notify({ eventType: "onFinish", workflowId, @@ -375,6 +411,8 @@ export class WorkflowOrchestratorService { result, errors, }) + + await this.triggerParentStep(ret.transaction, result) } return ret diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 6ea40c393b..c9e1ece600 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -66,16 +66,16 @@ export class RedisDistributedTransactionStorage } async onApplicationStart() { + const allowedJobs = [ + JobType.RETRY, + JobType.STEP_TIMEOUT, + JobType.TRANSACTION_TIMEOUT, + ] + this.worker = new Worker( this.queueName, async (job) => { - const allJobs = [ - JobType.RETRY, - JobType.STEP_TIMEOUT, - JobType.TRANSACTION_TIMEOUT, - ] - - if (allJobs.includes(job.name as JobType)) { + if (allowedJobs.includes(job.name as JobType)) { await this.executeTransaction( job.data.workflowId, job.data.transactionId diff --git a/yarn.lock b/yarn.lock index 086edeb606..01142baf30 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5855,7 +5855,7 @@ __metadata: "@medusajs/modules-sdk": ^1.12.11 "@medusajs/types": ^1.11.16 "@medusajs/utils": ^1.11.9 - bullmq: 5.12.0 + bullmq: 5.13.0 cross-env: ^5.2.1 ioredis: ^5.4.1 jest: ^29.7.0 @@ -6776,7 +6776,7 @@ __metadata: "@medusajs/utils": ^1.11.9 "@medusajs/workflows-sdk": ^0.1.6 "@mikro-orm/cli": 5.9.7 - bullmq: 5.12.0 + bullmq: 5.13.0 cross-env: ^5.2.1 ioredis: ^5.4.1 jest: ^29.7.0 @@ -6790,6 +6790,7 @@ __metadata: "@mikro-orm/migrations": 5.9.7 "@mikro-orm/postgresql": 5.9.7 awilix: ^8.0.1 + ulid: ^2.3.0 languageName: unknown linkType: soft @@ -16196,9 +16197,9 @@ __metadata: languageName: node linkType: hard -"bullmq@npm:5.12.0": - version: 5.12.0 - resolution: "bullmq@npm:5.12.0" +"bullmq@npm:5.13.0": + version: 5.13.0 + resolution: "bullmq@npm:5.13.0" dependencies: cron-parser: ^4.6.0 ioredis: ^5.4.1 @@ -16207,7 +16208,7 @@ __metadata: semver: ^7.5.4 tslib: ^2.0.0 uuid: ^9.0.0 - checksum: ade12a22c16db021385bdfab826ab64488fcfb07274df1b6269bf513d073ea8e8a812b7308027a16df3f00feaf1d2afe79c8c7c86749b44794c2a2e89e34a411 + checksum: 28435676dbd0a1d5085540095eb6ad338a0d07fe37e8268833d5d8d4d5e1895dc62c9ce288d5493ce0af664b8502e5eff723dd964ed5c2b2a2f5d1ee7b9a27de languageName: node linkType: hard