feat(workflows-sdk,orchestration): async step as background task (#6886)

This commit is contained in:
Carlos R. L. Rodrigues
2024-04-03 06:17:00 -03:00
committed by GitHub
parent 3dcf5224a1
commit a164c0d512
17 changed files with 290 additions and 34 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/orchestration": patch
"@medusajs/workflows-sdk": patch
"@medusajs/workflow-engine-redis": patch
---
Async steps marked as success if return StepResponse

View File

@@ -639,7 +639,7 @@ export class TransactionOrchestrator extends EventEmitter {
error: Error | any,
{ endRetry }: { endRetry?: boolean } = {}
) => {
const ret = TransactionOrchestrator.setStepFailure(
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
@@ -652,15 +652,20 @@ export class TransactionOrchestrator extends EventEmitter {
step.definition.retryInterval ?? 0
)
}
return ret
}
if (!isAsync) {
hasSyncSteps = true
execution.push(
transaction
.handler(step.definition.action + "", type, payload, transaction)
.handler(
step.definition.action + "",
type,
payload,
transaction,
step,
this
)
.then(async (response: any) => {
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
@@ -703,8 +708,34 @@ export class TransactionOrchestrator extends EventEmitter {
step.definition.action + "",
type,
payload,
transaction
transaction,
step,
this
)
.then(async (response: any) => {
if (!step.definition.backgroundExecution) {
return
}
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
await this.checkTransactionTimeout(
transaction,
nextSteps.next.includes(step) ? nextSteps.next : [step]
)
}
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
await transaction.scheduleRetry(
step,
step.definition.retryInterval ?? 0
)
})
.catch(async (error) => {
if (
PermanentStepFailureError.isPermanentStepFailureError(error)

View File

@@ -3,6 +3,7 @@ import {
DistributedTransaction,
TransactionPayload,
} from "./distributed-transaction"
import { TransactionOrchestrator } from "./transaction-orchestrator"
import {
TransactionHandlerType,
TransactionState,
@@ -14,7 +15,9 @@ export type TransactionStepHandler = (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload,
transaction?: DistributedTransaction
transaction: DistributedTransaction,
step: TransactionStep,
orchestrator: TransactionOrchestrator
) => Promise<unknown>
/**

View File

@@ -59,11 +59,17 @@ export type TransactionStepsDefinition = {
/**
* If true, the step is executed asynchronously. This means that the workflow will not wait for the response of this step.
* Async steps require to have their responses set using "setStepSuccess" or "setStepFailure".
* Async steps require to have their responses set using "setStepSuccess" or "setStepFailure", unless it is combined with "backgroundExecution: true".
* If combined with a timeout, and any response is not set within that interval, the step will be marked as "TransactionStepStatus.TIMEOUT" and the workflow will be reverted immediately.
*/
async?: boolean
/**
* It applies to "async" steps only, allowing them to run in the background and automatically complete without external intervention.
* It is ideal for time-consuming tasks that will be complete after the execution, contrasting with standard "async" operations that require a response to be set in a later stage.
*/
backgroundExecution?: boolean
/**
* If true, the compensation function for this step is executed asynchronously. Which means, the response has to be set using "setStepSuccess" or "setStepFailure".
*/

View File

@@ -6,6 +6,7 @@ import {
TransactionMetadata,
TransactionModelOptions,
TransactionOrchestrator,
TransactionStep,
TransactionStepHandler,
TransactionStepsDefinition,
} from "../transaction"
@@ -39,12 +40,14 @@ export type WorkflowStepHandlerArguments = {
compensate: { [actions: string]: unknown }
metadata: TransactionMetadata
transaction: DistributedTransaction
step: TransactionStep
orchestrator: TransactionOrchestrator
context?: Context
}
export type WorkflowStepHandler = (
args: WorkflowStepHandlerArguments
) => unknown
) => Promise<unknown>
export class WorkflowManager {
protected static workflows: Map<string, WorkflowDefinition> = new Map()
@@ -173,8 +176,10 @@ export class WorkflowManager {
return async (
actionId: string,
handlerType: TransactionHandlerType,
payload?: any,
transaction?: DistributedTransaction
payload: any,
transaction: DistributedTransaction,
step: TransactionStep,
orchestrator: TransactionOrchestrator
) => {
const command = handlers.get(actionId)
@@ -196,6 +201,8 @@ export class WorkflowManager {
compensate,
metadata,
transaction: transaction as DistributedTransaction,
step,
orchestrator,
context,
})
}

View File

@@ -1,4 +1,5 @@
export * from "./workflow_1"
export * from "./workflow_2"
export * from "./workflow_async"
export * from "./workflow_step_timeout"
export * from "./workflow_transaction_timeout"

View File

@@ -0,0 +1,29 @@
import {
StepResponse,
createStep,
createWorkflow,
} from "@medusajs/workflows-sdk"
import { setTimeout } from "timers/promises"
const step_1_background = createStep(
{
name: "step_1_background",
async: true,
},
jest.fn(async (input) => {
await setTimeout(200)
return new StepResponse(input)
})
)
createWorkflow(
{
name: "workflow_async_background",
},
function (input) {
const resp = step_1_background(input)
return resp
}
)

View File

@@ -5,8 +5,8 @@ import { IWorkflowEngineService } from "@medusajs/workflows-sdk"
import { knex } from "knex"
import { setTimeout } from "timers/promises"
import "../__fixtures__"
import { DB_URL, TestDatabase } from "../utils"
import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__"
import { DB_URL, TestDatabase } from "../utils"
const sharedPgConnection = knex<any, any>({
client: "pg",
@@ -172,5 +172,33 @@ describe("Workflow Orchestrator module", function () {
expect(transaction.flow.state).toEqual("reverted")
})
it("should subsctibe to a async workflow and receive the response when it finishes", (done) => {
const transactionId = "trx_123"
const onFinish = jest.fn(() => {
done()
})
void workflowOrcModule.subscribe({
workflowId: "workflow_async_background",
transactionId,
subscriber: (event) => {
if (event.eventType === "onFinish") {
onFinish()
}
},
})
void workflowOrcModule.run("workflow_async_background", {
input: {
myInput: "123",
},
transactionId,
throwOnError: false,
})
expect(onFinish).toHaveBeenCalledTimes(0)
})
})
})

View File

@@ -1,4 +1,5 @@
export * from "./workflow_1"
export * from "./workflow_2"
export * from "./workflow_async"
export * from "./workflow_step_timeout"
export * from "./workflow_transaction_timeout"

View File

@@ -25,8 +25,6 @@ const step_1 = createStep(
const step_2 = createStep(
"step_2",
jest.fn((input, context) => {
console.log("triggered async request", context.metadata.idempotency_key)
if (input) {
return new StepResponse({ notAsyncResponse: input.hey })
}

View File

@@ -25,8 +25,6 @@ const step_1 = createStep(
const step_2 = createStep(
"step_2",
jest.fn((input, context) => {
console.log("triggered async request", context.metadata.idempotency_key)
if (input) {
return new StepResponse({ notAsyncResponse: input.hey })
}

View File

@@ -0,0 +1,29 @@
import {
StepResponse,
createStep,
createWorkflow,
} from "@medusajs/workflows-sdk"
import { setTimeout } from "timers/promises"
const step_1_background = createStep(
{
name: "step_1_background",
async: true,
},
jest.fn(async (input) => {
await setTimeout(200)
return new StepResponse(input)
})
)
createWorkflow(
{
name: "workflow_async_background",
},
function (input) {
const resp = step_1_background(input)
return resp
}
)

View File

@@ -22,7 +22,7 @@ const step_1_async = createStep(
},
jest.fn(async (input) => {
return new StepResponse(input, { compensate: 123 })
return
})
)

View File

@@ -4,7 +4,7 @@ import {
TransactionTimeoutError,
} from "@medusajs/orchestration"
import { RemoteQueryFunction } from "@medusajs/types"
import { TransactionHandlerType } from "@medusajs/utils"
import { TransactionHandlerType, TransactionStepState } from "@medusajs/utils"
import { IWorkflowEngineService } from "@medusajs/workflows-sdk"
import { knex } from "knex"
import { setTimeout } from "timers/promises"
@@ -237,5 +237,64 @@ describe("Workflow Orchestrator module", function () {
TransactionTimeoutError.isTransactionTimeoutError(errors[0].error)
).toBe(true)
})
it("should complete an async workflow that returns a StepResponse", async () => {
const { transaction, result } = await workflowOrcModule.run(
"workflow_async_background",
{
input: {
myInput: "123",
},
transactionId: "transaction_1",
throwOnError: false,
}
)
expect(transaction.flow.state).toEqual(TransactionStepState.INVOKING)
expect(result).toEqual(undefined)
await setTimeout(205)
const trx = await workflowOrcModule.run("workflow_async_background", {
input: {
myInput: "123",
},
transactionId: "transaction_1",
throwOnError: false,
})
expect(trx.transaction.flow.state).toEqual(TransactionStepState.DONE)
expect(trx.result).toEqual({
myInput: "123",
})
})
it("should subsctibe to a async workflow and receive the response when it finishes", (done) => {
const transactionId = "trx_123"
const onFinish = jest.fn(() => {
done()
})
void workflowOrcModule.run("workflow_async_background", {
input: {
myInput: "123",
},
transactionId,
throwOnError: false,
})
void workflowOrcModule.subscribe({
workflowId: "workflow_async_background",
transactionId,
subscriber: (event) => {
if (event.eventType === "onFinish") {
onFinish()
}
},
})
expect(onFinish).toHaveBeenCalledTimes(0)
})
})
})

View File

@@ -9,9 +9,9 @@ import {
import {
InjectManager,
InjectSharedContext,
isString,
MedusaContext,
MedusaError,
isString,
} from "@medusajs/utils"
import type {
IWorkflowEngineService,

View File

@@ -190,7 +190,7 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
stepId: step.id,
},
{
delay: interval * 1000,
delay: interval > 0 ? interval * 1000 : undefined,
jobId: this.getJobId(JobType.RETRY, transaction, step),
removeOnComplete: true,
}
@@ -266,6 +266,9 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
if (step) {
key.push(step.id, step.attempts + "")
if (step.isCompensating()) {
key.push("compensate")
}
}
return key.join(":")

View File

@@ -1,6 +1,7 @@
import {
TransactionStepsDefinition,
WorkflowManager,
WorkflowStepHandler,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils"
@@ -121,25 +122,23 @@ function applyStep<
}
const handler = {
invoke: async (transactionContext: WorkflowStepHandlerArguments) => {
const metadata = transactionContext.metadata
invoke: async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
transactionContext.context!.idempotencyKey = idempotencyKey
stepArguments.context!.idempotencyKey = idempotencyKey
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
stepName: metadata.action,
action: "invoke",
idempotencyKey,
attempt: metadata.attempt,
container: transactionContext.container,
container: stepArguments.container,
metadata,
context: transactionContext.context!,
context: stepArguments.context!,
}
const argInput = input
? await resolveValue(input, transactionContext)
: {}
const argInput = input ? await resolveValue(input, stepArguments) : {}
const stepResponse: StepResponse<any, any> = await invokeFn.apply(
this,
[argInput, executionContext]
@@ -156,11 +155,11 @@ function applyStep<
}
},
compensate: compensateFn
? async (transactionContext: WorkflowStepHandlerArguments) => {
const metadata = transactionContext.metadata
? async (stepArguments: WorkflowStepHandlerArguments) => {
const metadata = stepArguments.metadata
const idempotencyKey = metadata.idempotency_key
transactionContext.context!.idempotencyKey = idempotencyKey
stepArguments.context!.idempotencyKey = idempotencyKey
const executionContext: StepExecutionContext = {
workflowId: metadata.model_id,
@@ -168,13 +167,12 @@ function applyStep<
action: "compensate",
idempotencyKey,
attempt: metadata.attempt,
container: transactionContext.container,
container: stepArguments.container,
metadata,
context: transactionContext.context!,
context: stepArguments.context!,
}
const stepOutput = (transactionContext.invoke[stepName] as any)
?.output
const stepOutput = (stepArguments.invoke[stepName] as any)?.output
const invokeResult =
stepOutput?.__type ===
OrchestrationUtils.SymbolWorkflowStepResponse
@@ -191,6 +189,8 @@ function applyStep<
: undefined,
}
wrapAsyncHandler(stepConfig, handler)
stepConfig.uuid = ulid()
stepConfig.noCompensation = !compensateFn
@@ -231,6 +231,62 @@ function applyStep<
}
}
/**
* @internal
*
* Internal function to handle async steps to be automatically marked as completed after they are executed.
*
* @param stepConfig
* @param handle
*/
function wrapAsyncHandler(
stepConfig: TransactionStepsDefinition,
handle: {
invoke: WorkflowStepHandler
compensate?: WorkflowStepHandler
}
) {
if (stepConfig.async) {
if (typeof handle.invoke === "function") {
const originalInvoke = handle.invoke
handle.invoke = async (stepArguments: WorkflowStepHandlerArguments) => {
const response = (await originalInvoke(stepArguments)) as any
if (
response?.output?.__type !==
OrchestrationUtils.SymbolWorkflowStepResponse
) {
return
}
stepArguments.step.definition.backgroundExecution = true
return response
}
}
}
if (stepConfig.compensateAsync) {
if (typeof handle.compensate === "function") {
const originalCompensate = handle.compensate!
handle.compensate = async (
stepArguments: WorkflowStepHandlerArguments
) => {
const response = (await originalCompensate(stepArguments)) as any
if (
response?.output?.__type !==
OrchestrationUtils.SymbolWorkflowStepResponse
) {
return
}
stepArguments.step.definition.backgroundExecution = true
return response
}
}
}
}
/**
* This function creates a {@link StepFunction} that can be used as a step in a workflow constructed by the {@link createWorkflow} function.
*