From a164c0d5126a40e2bc669f9fc2883be502a15036 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Wed, 3 Apr 2024 06:17:00 -0300 Subject: [PATCH] feat(workflows-sdk,orchestration): async step as background task (#6886) --- .changeset/eighty-dodos-peel.md | 7 ++ .../transaction/transaction-orchestrator.ts | 41 +++++++-- .../src/transaction/transaction-step.ts | 5 +- .../orchestration/src/transaction/types.ts | 8 +- .../src/workflow/workflow-manager.ts | 13 ++- .../integration-tests/__fixtures__/index.ts | 1 + .../__fixtures__/workflow_async.ts | 29 +++++++ .../integration-tests/__tests__/index.spec.ts | 30 ++++++- .../integration-tests/__fixtures__/index.ts | 1 + .../__fixtures__/workflow_1.ts | 2 - .../__fixtures__/workflow_2.ts | 2 - .../__fixtures__/workflow_async.ts | 29 +++++++ .../__fixtures__/workflow_step_timeout.ts | 2 +- .../integration-tests/__tests__/index.spec.ts | 61 ++++++++++++- .../src/services/workflows-module.ts | 2 +- .../utils/workflow-orchestrator-storage.ts | 5 +- .../src/utils/composer/create-step.ts | 86 +++++++++++++++---- 17 files changed, 290 insertions(+), 34 deletions(-) create mode 100644 .changeset/eighty-dodos-peel.md create mode 100644 packages/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts create mode 100644 packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts diff --git a/.changeset/eighty-dodos-peel.md b/.changeset/eighty-dodos-peel.md new file mode 100644 index 0000000000..d4147beef1 --- /dev/null +++ b/.changeset/eighty-dodos-peel.md @@ -0,0 +1,7 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +"@medusajs/workflow-engine-redis": patch +--- + +Async steps marked as success if return StepResponse diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index ae19b20995..d706b2a8ba 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -639,7 +639,7 @@ export class TransactionOrchestrator extends EventEmitter { error: Error | any, { endRetry }: { endRetry?: boolean } = {} ) => { - const ret = TransactionOrchestrator.setStepFailure( + await TransactionOrchestrator.setStepFailure( transaction, step, error, @@ -652,15 +652,20 @@ export class TransactionOrchestrator extends EventEmitter { step.definition.retryInterval ?? 0 ) } - - return ret } if (!isAsync) { hasSyncSteps = true execution.push( transaction - .handler(step.definition.action + "", type, payload, transaction) + .handler( + step.definition.action + "", + type, + payload, + transaction, + step, + this + ) .then(async (response: any) => { if (this.hasExpired({ transaction, step }, Date.now())) { await this.checkStepTimeout(transaction, step) @@ -703,8 +708,34 @@ export class TransactionOrchestrator extends EventEmitter { step.definition.action + "", type, payload, - transaction + transaction, + step, + this ) + .then(async (response: any) => { + if (!step.definition.backgroundExecution) { + return + } + + if (this.hasExpired({ transaction, step }, Date.now())) { + await this.checkStepTimeout(transaction, step) + await this.checkTransactionTimeout( + transaction, + nextSteps.next.includes(step) ? nextSteps.next : [step] + ) + } + + await TransactionOrchestrator.setStepSuccess( + transaction, + step, + response + ) + + await transaction.scheduleRetry( + step, + step.definition.retryInterval ?? 0 + ) + }) .catch(async (error) => { if ( PermanentStepFailureError.isPermanentStepFailureError(error) diff --git a/packages/orchestration/src/transaction/transaction-step.ts b/packages/orchestration/src/transaction/transaction-step.ts index bf20b5635a..c5c8016282 100644 --- a/packages/orchestration/src/transaction/transaction-step.ts +++ b/packages/orchestration/src/transaction/transaction-step.ts @@ -3,6 +3,7 @@ import { DistributedTransaction, TransactionPayload, } from "./distributed-transaction" +import { TransactionOrchestrator } from "./transaction-orchestrator" import { TransactionHandlerType, TransactionState, @@ -14,7 +15,9 @@ export type TransactionStepHandler = ( actionId: string, handlerType: TransactionHandlerType, payload: TransactionPayload, - transaction?: DistributedTransaction + transaction: DistributedTransaction, + step: TransactionStep, + orchestrator: TransactionOrchestrator ) => Promise /** diff --git a/packages/orchestration/src/transaction/types.ts b/packages/orchestration/src/transaction/types.ts index bd5abab5ca..1ab854c72d 100644 --- a/packages/orchestration/src/transaction/types.ts +++ b/packages/orchestration/src/transaction/types.ts @@ -59,11 +59,17 @@ export type TransactionStepsDefinition = { /** * If true, the step is executed asynchronously. This means that the workflow will not wait for the response of this step. - * Async steps require to have their responses set using "setStepSuccess" or "setStepFailure". + * Async steps require to have their responses set using "setStepSuccess" or "setStepFailure", unless it is combined with "backgroundExecution: true". * If combined with a timeout, and any response is not set within that interval, the step will be marked as "TransactionStepStatus.TIMEOUT" and the workflow will be reverted immediately. */ async?: boolean + /** + * It applies to "async" steps only, allowing them to run in the background and automatically complete without external intervention. + * It is ideal for time-consuming tasks that will be complete after the execution, contrasting with standard "async" operations that require a response to be set in a later stage. + */ + backgroundExecution?: boolean + /** * If true, the compensation function for this step is executed asynchronously. Which means, the response has to be set using "setStepSuccess" or "setStepFailure". */ diff --git a/packages/orchestration/src/workflow/workflow-manager.ts b/packages/orchestration/src/workflow/workflow-manager.ts index fa75f8a6e3..a2ec8fd69a 100644 --- a/packages/orchestration/src/workflow/workflow-manager.ts +++ b/packages/orchestration/src/workflow/workflow-manager.ts @@ -6,6 +6,7 @@ import { TransactionMetadata, TransactionModelOptions, TransactionOrchestrator, + TransactionStep, TransactionStepHandler, TransactionStepsDefinition, } from "../transaction" @@ -39,12 +40,14 @@ export type WorkflowStepHandlerArguments = { compensate: { [actions: string]: unknown } metadata: TransactionMetadata transaction: DistributedTransaction + step: TransactionStep + orchestrator: TransactionOrchestrator context?: Context } export type WorkflowStepHandler = ( args: WorkflowStepHandlerArguments -) => unknown +) => Promise export class WorkflowManager { protected static workflows: Map = new Map() @@ -173,8 +176,10 @@ export class WorkflowManager { return async ( actionId: string, handlerType: TransactionHandlerType, - payload?: any, - transaction?: DistributedTransaction + payload: any, + transaction: DistributedTransaction, + step: TransactionStep, + orchestrator: TransactionOrchestrator ) => { const command = handlers.get(actionId) @@ -196,6 +201,8 @@ export class WorkflowManager { compensate, metadata, transaction: transaction as DistributedTransaction, + step, + orchestrator, context, }) } diff --git a/packages/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts b/packages/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts index 987a8a99bd..c5e62ad463 100644 --- a/packages/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts +++ b/packages/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts @@ -1,4 +1,5 @@ export * from "./workflow_1" export * from "./workflow_2" +export * from "./workflow_async" export * from "./workflow_step_timeout" export * from "./workflow_transaction_timeout" diff --git a/packages/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts b/packages/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts new file mode 100644 index 0000000000..b2c627bcd1 --- /dev/null +++ b/packages/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_async.ts @@ -0,0 +1,29 @@ +import { + StepResponse, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" +import { setTimeout } from "timers/promises" + +const step_1_background = createStep( + { + name: "step_1_background", + async: true, + }, + jest.fn(async (input) => { + await setTimeout(200) + + return new StepResponse(input) + }) +) + +createWorkflow( + { + name: "workflow_async_background", + }, + function (input) { + const resp = step_1_background(input) + + return resp + } +) diff --git a/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 9b9d365485..e37fa50b48 100644 --- a/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -5,8 +5,8 @@ import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" -import { DB_URL, TestDatabase } from "../utils" import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__" +import { DB_URL, TestDatabase } from "../utils" const sharedPgConnection = knex({ client: "pg", @@ -172,5 +172,33 @@ describe("Workflow Orchestrator module", function () { expect(transaction.flow.state).toEqual("reverted") }) + + it("should subsctibe to a async workflow and receive the response when it finishes", (done) => { + const transactionId = "trx_123" + + const onFinish = jest.fn(() => { + done() + }) + + void workflowOrcModule.subscribe({ + workflowId: "workflow_async_background", + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + onFinish() + } + }, + }) + + void workflowOrcModule.run("workflow_async_background", { + input: { + myInput: "123", + }, + transactionId, + throwOnError: false, + }) + + expect(onFinish).toHaveBeenCalledTimes(0) + }) }) }) diff --git a/packages/workflow-engine-redis/integration-tests/__fixtures__/index.ts b/packages/workflow-engine-redis/integration-tests/__fixtures__/index.ts index 987a8a99bd..c5e62ad463 100644 --- a/packages/workflow-engine-redis/integration-tests/__fixtures__/index.ts +++ b/packages/workflow-engine-redis/integration-tests/__fixtures__/index.ts @@ -1,4 +1,5 @@ export * from "./workflow_1" export * from "./workflow_2" +export * from "./workflow_async" export * from "./workflow_step_timeout" export * from "./workflow_transaction_timeout" diff --git a/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts index cb0056466e..7f03848dda 100644 --- a/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts +++ b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_1.ts @@ -25,8 +25,6 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - console.log("triggered async request", context.metadata.idempotency_key) - if (input) { return new StepResponse({ notAsyncResponse: input.hey }) } diff --git a/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts index f15d51889f..89e8a7e39b 100644 --- a/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts +++ b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_2.ts @@ -25,8 +25,6 @@ const step_1 = createStep( const step_2 = createStep( "step_2", jest.fn((input, context) => { - console.log("triggered async request", context.metadata.idempotency_key) - if (input) { return new StepResponse({ notAsyncResponse: input.hey }) } diff --git a/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts new file mode 100644 index 0000000000..b2c627bcd1 --- /dev/null +++ b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_async.ts @@ -0,0 +1,29 @@ +import { + StepResponse, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" +import { setTimeout } from "timers/promises" + +const step_1_background = createStep( + { + name: "step_1_background", + async: true, + }, + jest.fn(async (input) => { + await setTimeout(200) + + return new StepResponse(input) + }) +) + +createWorkflow( + { + name: "workflow_async_background", + }, + function (input) { + const resp = step_1_background(input) + + return resp + } +) diff --git a/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_step_timeout.ts b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_step_timeout.ts index 0bdbf9fd9c..ea56d469ad 100644 --- a/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_step_timeout.ts +++ b/packages/workflow-engine-redis/integration-tests/__fixtures__/workflow_step_timeout.ts @@ -22,7 +22,7 @@ const step_1_async = createStep( }, jest.fn(async (input) => { - return new StepResponse(input, { compensate: 123 }) + return }) ) diff --git a/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 4a8493cf31..0763c652dd 100644 --- a/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -4,7 +4,7 @@ import { TransactionTimeoutError, } from "@medusajs/orchestration" import { RemoteQueryFunction } from "@medusajs/types" -import { TransactionHandlerType } from "@medusajs/utils" +import { TransactionHandlerType, TransactionStepState } from "@medusajs/utils" import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout } from "timers/promises" @@ -237,5 +237,64 @@ describe("Workflow Orchestrator module", function () { TransactionTimeoutError.isTransactionTimeoutError(errors[0].error) ).toBe(true) }) + + it("should complete an async workflow that returns a StepResponse", async () => { + const { transaction, result } = await workflowOrcModule.run( + "workflow_async_background", + { + input: { + myInput: "123", + }, + transactionId: "transaction_1", + throwOnError: false, + } + ) + + expect(transaction.flow.state).toEqual(TransactionStepState.INVOKING) + expect(result).toEqual(undefined) + + await setTimeout(205) + + const trx = await workflowOrcModule.run("workflow_async_background", { + input: { + myInput: "123", + }, + transactionId: "transaction_1", + throwOnError: false, + }) + + expect(trx.transaction.flow.state).toEqual(TransactionStepState.DONE) + expect(trx.result).toEqual({ + myInput: "123", + }) + }) + + it("should subsctibe to a async workflow and receive the response when it finishes", (done) => { + const transactionId = "trx_123" + + const onFinish = jest.fn(() => { + done() + }) + + void workflowOrcModule.run("workflow_async_background", { + input: { + myInput: "123", + }, + transactionId, + throwOnError: false, + }) + + void workflowOrcModule.subscribe({ + workflowId: "workflow_async_background", + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + onFinish() + } + }, + }) + + expect(onFinish).toHaveBeenCalledTimes(0) + }) }) }) diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index d0fb245f83..c53542703e 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -9,9 +9,9 @@ import { import { InjectManager, InjectSharedContext, - isString, MedusaContext, MedusaError, + isString, } from "@medusajs/utils" import type { IWorkflowEngineService, diff --git a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 6717f0bd1e..bd718f60ca 100644 --- a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -190,7 +190,7 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt stepId: step.id, }, { - delay: interval * 1000, + delay: interval > 0 ? interval * 1000 : undefined, jobId: this.getJobId(JobType.RETRY, transaction, step), removeOnComplete: true, } @@ -266,6 +266,9 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt if (step) { key.push(step.id, step.attempts + "") + if (step.isCompensating()) { + key.push("compensate") + } } return key.join(":") diff --git a/packages/workflows-sdk/src/utils/composer/create-step.ts b/packages/workflows-sdk/src/utils/composer/create-step.ts index 7df37e6f2f..10dd2ce961 100644 --- a/packages/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/workflows-sdk/src/utils/composer/create-step.ts @@ -1,6 +1,7 @@ import { TransactionStepsDefinition, WorkflowManager, + WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils" @@ -121,25 +122,23 @@ function applyStep< } const handler = { - invoke: async (transactionContext: WorkflowStepHandlerArguments) => { - const metadata = transactionContext.metadata + invoke: async (stepArguments: WorkflowStepHandlerArguments) => { + const metadata = stepArguments.metadata const idempotencyKey = metadata.idempotency_key - transactionContext.context!.idempotencyKey = idempotencyKey + stepArguments.context!.idempotencyKey = idempotencyKey const executionContext: StepExecutionContext = { workflowId: metadata.model_id, stepName: metadata.action, action: "invoke", idempotencyKey, attempt: metadata.attempt, - container: transactionContext.container, + container: stepArguments.container, metadata, - context: transactionContext.context!, + context: stepArguments.context!, } - const argInput = input - ? await resolveValue(input, transactionContext) - : {} + const argInput = input ? await resolveValue(input, stepArguments) : {} const stepResponse: StepResponse = await invokeFn.apply( this, [argInput, executionContext] @@ -156,11 +155,11 @@ function applyStep< } }, compensate: compensateFn - ? async (transactionContext: WorkflowStepHandlerArguments) => { - const metadata = transactionContext.metadata + ? async (stepArguments: WorkflowStepHandlerArguments) => { + const metadata = stepArguments.metadata const idempotencyKey = metadata.idempotency_key - transactionContext.context!.idempotencyKey = idempotencyKey + stepArguments.context!.idempotencyKey = idempotencyKey const executionContext: StepExecutionContext = { workflowId: metadata.model_id, @@ -168,13 +167,12 @@ function applyStep< action: "compensate", idempotencyKey, attempt: metadata.attempt, - container: transactionContext.container, + container: stepArguments.container, metadata, - context: transactionContext.context!, + context: stepArguments.context!, } - const stepOutput = (transactionContext.invoke[stepName] as any) - ?.output + const stepOutput = (stepArguments.invoke[stepName] as any)?.output const invokeResult = stepOutput?.__type === OrchestrationUtils.SymbolWorkflowStepResponse @@ -191,6 +189,8 @@ function applyStep< : undefined, } + wrapAsyncHandler(stepConfig, handler) + stepConfig.uuid = ulid() stepConfig.noCompensation = !compensateFn @@ -231,6 +231,62 @@ function applyStep< } } +/** + * @internal + * + * Internal function to handle async steps to be automatically marked as completed after they are executed. + * + * @param stepConfig + * @param handle + */ +function wrapAsyncHandler( + stepConfig: TransactionStepsDefinition, + handle: { + invoke: WorkflowStepHandler + compensate?: WorkflowStepHandler + } +) { + if (stepConfig.async) { + if (typeof handle.invoke === "function") { + const originalInvoke = handle.invoke + + handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => { + const response = (await originalInvoke(stepArguments)) as any + if ( + response?.output?.__type !== + OrchestrationUtils.SymbolWorkflowStepResponse + ) { + return + } + + stepArguments.step.definition.backgroundExecution = true + return response + } + } + } + + if (stepConfig.compensateAsync) { + if (typeof handle.compensate === "function") { + const originalCompensate = handle.compensate! + handle.compensate = async ( + stepArguments: WorkflowStepHandlerArguments + ) => { + const response = (await originalCompensate(stepArguments)) as any + + if ( + response?.output?.__type !== + OrchestrationUtils.SymbolWorkflowStepResponse + ) { + return + } + stepArguments.step.definition.backgroundExecution = true + + return response + } + } + } +} + /** * This function creates a {@link StepFunction} that can be used as a step in a workflow constructed by the {@link createWorkflow} function. *