feat(workflows-sdk): Execute workflows as step in other workflows (#7183)
This commit is contained in:
committed by
GitHub
parent
b8a0a459a8
commit
7a351eef09
5
.changeset/gentle-zebras-poke.md
Normal file
5
.changeset/gentle-zebras-poke.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@medusajs/workflows-sdk": patch
|
||||
---
|
||||
|
||||
feat(workflows-sdk): Execute workflows as step in other workflows
|
||||
@@ -1,11 +1,11 @@
|
||||
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
import {
|
||||
MedusaContext,
|
||||
MedusaContextType,
|
||||
MedusaModuleType,
|
||||
createMedusaContainer,
|
||||
isDefined,
|
||||
isString,
|
||||
MedusaContext,
|
||||
MedusaContextType,
|
||||
MedusaModuleType,
|
||||
} from "@medusajs/utils"
|
||||
import { asValue } from "awilix"
|
||||
import {
|
||||
@@ -29,7 +29,7 @@ type StepHandler = {
|
||||
}
|
||||
|
||||
export class LocalWorkflow {
|
||||
protected container: MedusaContainer
|
||||
protected container_: MedusaContainer
|
||||
protected workflowId: string
|
||||
protected flow: OrchestratorBuilder
|
||||
protected customOptions: Partial<TransactionModelOptions> = {}
|
||||
@@ -37,9 +37,17 @@ export class LocalWorkflow {
|
||||
protected handlers: Map<string, StepHandler>
|
||||
protected medusaContext?: Context
|
||||
|
||||
get container() {
|
||||
return this.container_
|
||||
}
|
||||
|
||||
set container(modulesLoaded: LoadedModule[] | MedusaContainer) {
|
||||
this.resolveContainer(modulesLoaded)
|
||||
}
|
||||
|
||||
constructor(
|
||||
workflowId: string,
|
||||
modulesLoaded: LoadedModule[] | MedusaContainer
|
||||
modulesLoaded?: LoadedModule[] | MedusaContainer
|
||||
) {
|
||||
const globalWorkflow = WorkflowManager.getWorkflow(workflowId)
|
||||
if (!globalWorkflow) {
|
||||
@@ -51,6 +59,10 @@ export class LocalWorkflow {
|
||||
this.workflow = globalWorkflow
|
||||
this.handlers = new Map(globalWorkflow.handlers_)
|
||||
|
||||
this.resolveContainer(modulesLoaded)
|
||||
}
|
||||
|
||||
private resolveContainer(modulesLoaded?: LoadedModule[] | MedusaContainer) {
|
||||
let container
|
||||
|
||||
if (!Array.isArray(modulesLoaded) && modulesLoaded) {
|
||||
@@ -68,7 +80,7 @@ export class LocalWorkflow {
|
||||
}
|
||||
}
|
||||
|
||||
this.container = this.contextualizedMedusaModules(container)
|
||||
this.container_ = this.contextualizedMedusaModules(container)
|
||||
}
|
||||
|
||||
private contextualizedMedusaModules(container) {
|
||||
@@ -326,7 +338,7 @@ export class LocalWorkflow {
|
||||
|
||||
const transaction = await orchestrator.beginTransaction(
|
||||
uniqueTransactionId,
|
||||
handler(this.container, context),
|
||||
handler(this.container_, context),
|
||||
input
|
||||
)
|
||||
|
||||
@@ -349,7 +361,7 @@ export class LocalWorkflow {
|
||||
|
||||
const transaction = await orchestrator.retrieveExistingTransaction(
|
||||
uniqueTransactionId,
|
||||
handler(this.container, context)
|
||||
handler(this.container_, context)
|
||||
)
|
||||
|
||||
return transaction
|
||||
@@ -397,7 +409,7 @@ export class LocalWorkflow {
|
||||
|
||||
const transaction = await orchestrator.registerStepSuccess(
|
||||
idempotencyKey,
|
||||
handler(this.container, context),
|
||||
handler(this.container_, context),
|
||||
undefined,
|
||||
response
|
||||
)
|
||||
@@ -425,7 +437,7 @@ export class LocalWorkflow {
|
||||
const transaction = await orchestrator.registerStepFailure(
|
||||
idempotencyKey,
|
||||
error,
|
||||
handler(this.container, context)
|
||||
handler(this.container_, context)
|
||||
)
|
||||
|
||||
cleanUpEventListeners()
|
||||
|
||||
@@ -5,17 +5,20 @@ import {
|
||||
TransactionStep,
|
||||
} from "@medusajs/orchestration"
|
||||
import { ContainerLike, Context, MedusaContainer } from "@medusajs/types"
|
||||
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
|
||||
import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils"
|
||||
import {
|
||||
MedusaWorkflow,
|
||||
ReturnWorkflow,
|
||||
resolveValue,
|
||||
type FlowRunOptions,
|
||||
MedusaWorkflow,
|
||||
resolveValue,
|
||||
ReturnWorkflow,
|
||||
} from "@medusajs/workflows-sdk"
|
||||
import { ulid } from "ulid"
|
||||
import { InMemoryDistributedTransactionStorage } from "../utils"
|
||||
|
||||
export type WorkflowOrchestratorRunOptions<T> = FlowRunOptions<T> & {
|
||||
export type WorkflowOrchestratorRunOptions<T> = Omit<
|
||||
FlowRunOptions<T>,
|
||||
"container"
|
||||
> & {
|
||||
transactionId?: string
|
||||
container?: ContainerLike
|
||||
}
|
||||
|
||||
@@ -9,9 +9,9 @@ import {
|
||||
import {
|
||||
InjectManager,
|
||||
InjectSharedContext,
|
||||
isString,
|
||||
MedusaContext,
|
||||
MedusaError,
|
||||
isString,
|
||||
} from "@medusajs/utils"
|
||||
import type {
|
||||
IWorkflowEngineService,
|
||||
@@ -191,7 +191,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
|
||||
transactionId: string,
|
||||
@MedusaContext() context: Context = {}
|
||||
) {
|
||||
return this.workflowOrchestratorService_.getRunningTransaction(
|
||||
return await this.workflowOrchestratorService_.getRunningTransaction(
|
||||
workflowId,
|
||||
transactionId,
|
||||
context
|
||||
@@ -211,7 +211,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
|
||||
},
|
||||
@MedusaContext() context: Context = {}
|
||||
) {
|
||||
return this.workflowOrchestratorService_.setStepSuccess(
|
||||
return await this.workflowOrchestratorService_.setStepSuccess(
|
||||
{
|
||||
idempotencyKey,
|
||||
stepResponse,
|
||||
@@ -234,7 +234,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
|
||||
},
|
||||
@MedusaContext() context: Context = {}
|
||||
) {
|
||||
return this.workflowOrchestratorService_.setStepFailure(
|
||||
return await this.workflowOrchestratorService_.setStepFailure(
|
||||
{
|
||||
idempotencyKey,
|
||||
stepResponse,
|
||||
|
||||
@@ -10,18 +10,21 @@ import {
|
||||
Logger,
|
||||
MedusaContainer,
|
||||
} from "@medusajs/types"
|
||||
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
|
||||
import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils"
|
||||
import {
|
||||
FlowRunOptions,
|
||||
MedusaWorkflow,
|
||||
ReturnWorkflow,
|
||||
resolveValue,
|
||||
ReturnWorkflow,
|
||||
} from "@medusajs/workflows-sdk"
|
||||
import Redis from "ioredis"
|
||||
import { ulid } from "ulid"
|
||||
import type { RedisDistributedTransactionStorage } from "../utils"
|
||||
|
||||
export type WorkflowOrchestratorRunOptions<T> = FlowRunOptions<T> & {
|
||||
export type WorkflowOrchestratorRunOptions<T> = Omit<
|
||||
FlowRunOptions<T>,
|
||||
"container"
|
||||
> & {
|
||||
transactionId?: string
|
||||
container?: ContainerLike
|
||||
}
|
||||
|
||||
@@ -2,3 +2,4 @@ export * from "./merge-data"
|
||||
export * from "./empty-handler"
|
||||
export * from "./pipe"
|
||||
export * from "./workflow-export"
|
||||
export * from "./type"
|
||||
|
||||
134
packages/workflows-sdk/src/helper/type.ts
Normal file
134
packages/workflows-sdk/src/helper/type.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
import {
|
||||
DistributedTransaction,
|
||||
DistributedTransactionEvents,
|
||||
LocalWorkflow,
|
||||
TransactionStepError,
|
||||
} from "@medusajs/orchestration"
|
||||
|
||||
export type FlowRunOptions<TData = unknown> = {
|
||||
input?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
|
||||
export type FlowRegisterStepSuccessOptions<TData = unknown> = {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
|
||||
export type FlowRegisterStepFailureOptions<TData = unknown> = {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
|
||||
export type FlowCancelOptions = {
|
||||
transaction?: DistributedTransaction
|
||||
transactionId?: string
|
||||
context?: Context
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
|
||||
export type WorkflowResult<TResult = unknown> = {
|
||||
errors: TransactionStepError[]
|
||||
transaction: DistributedTransaction
|
||||
result: TResult
|
||||
}
|
||||
|
||||
export type ExportedWorkflow<
|
||||
TData = unknown,
|
||||
TResult = unknown,
|
||||
TDataOverride = undefined,
|
||||
TResultOverride = undefined
|
||||
> = {
|
||||
run: (
|
||||
args?: FlowRunOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
) => Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
registerStepSuccess: (
|
||||
args?: FlowRegisterStepSuccessOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
) => Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
registerStepFailure: (
|
||||
args?: FlowRegisterStepFailureOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
) => Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
cancel: (args?: FlowCancelOptions) => Promise<WorkflowResult>
|
||||
}
|
||||
|
||||
export type MainExportedWorkflow<TData = unknown, TResult = unknown> = {
|
||||
// Main function on the exported workflow
|
||||
<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
): Omit<
|
||||
LocalWorkflow,
|
||||
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
|
||||
> &
|
||||
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
|
||||
} & {
|
||||
/**
|
||||
* You can also directly call run, registerStepSuccess, registerStepFailure and cancel on the exported workflow
|
||||
*/
|
||||
|
||||
run<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
args?: FlowRunOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
|
||||
registerStepSuccess<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
args?: FlowRegisterStepSuccessOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
|
||||
registerStepFailure<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
args?: FlowRegisterStepFailureOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
|
||||
cancel(args?: FlowCancelOptions): Promise<WorkflowResult>
|
||||
}
|
||||
@@ -1,12 +1,9 @@
|
||||
import {
|
||||
DistributedTransaction,
|
||||
DistributedTransactionEvents,
|
||||
LocalWorkflow,
|
||||
TransactionHandlerType,
|
||||
TransactionState,
|
||||
TransactionStepError,
|
||||
} from "@medusajs/orchestration"
|
||||
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
import { LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
|
||||
import { MedusaModule } from "@medusajs/modules-sdk"
|
||||
import { MedusaContextType } from "@medusajs/utils"
|
||||
@@ -14,139 +11,15 @@ import { EOL } from "os"
|
||||
import { ulid } from "ulid"
|
||||
import { MedusaWorkflow } from "../medusa-workflow"
|
||||
import { resolveValue } from "../utils/composer"
|
||||
|
||||
export type FlowRunOptions<TData = unknown> = {
|
||||
input?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
}
|
||||
|
||||
export type FlowRegisterStepSuccessOptions<TData = unknown> = {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
}
|
||||
|
||||
export type FlowRegisterStepFailureOptions<TData = unknown> = {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
}
|
||||
|
||||
export type FlowCancelOptions = {
|
||||
transaction?: DistributedTransaction
|
||||
transactionId?: string
|
||||
context?: Context
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
}
|
||||
|
||||
export type WorkflowResult<TResult = unknown> = {
|
||||
errors: TransactionStepError[]
|
||||
transaction: DistributedTransaction
|
||||
result: TResult
|
||||
}
|
||||
|
||||
export type ExportedWorkflow<
|
||||
TData = unknown,
|
||||
TResult = unknown,
|
||||
TDataOverride = undefined,
|
||||
TResultOverride = undefined
|
||||
> = {
|
||||
run: (
|
||||
args?: FlowRunOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
) => Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
registerStepSuccess: (
|
||||
args?: FlowRegisterStepSuccessOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
) => Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
registerStepFailure: (
|
||||
args?: FlowRegisterStepFailureOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
>
|
||||
) => Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
cancel: (args?: FlowCancelOptions) => Promise<WorkflowResult>
|
||||
}
|
||||
|
||||
export type MainExportedWorkflow<TData = unknown, TResult = unknown> = {
|
||||
// Main function on the exported workflow
|
||||
<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
): Omit<
|
||||
LocalWorkflow,
|
||||
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
|
||||
> &
|
||||
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
|
||||
|
||||
/**
|
||||
* You can also directly call run, registerStepSuccess, registerStepFailure and cancel on the exported workflow
|
||||
*/
|
||||
|
||||
run<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
args?: FlowRunOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
> & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
|
||||
registerStepSuccess<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
args?: FlowRegisterStepSuccessOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
> & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
|
||||
registerStepFailure<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
args?: FlowRegisterStepFailureOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
> & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
>
|
||||
>
|
||||
|
||||
cancel(
|
||||
args?: FlowCancelOptions & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
): Promise<WorkflowResult>
|
||||
}
|
||||
import {
|
||||
ExportedWorkflow,
|
||||
FlowCancelOptions,
|
||||
FlowRegisterStepFailureOptions,
|
||||
FlowRegisterStepSuccessOptions,
|
||||
FlowRunOptions,
|
||||
MainExportedWorkflow,
|
||||
WorkflowResult,
|
||||
} from "./type"
|
||||
|
||||
function createContextualWorkflowRunner<
|
||||
TData = unknown,
|
||||
@@ -172,12 +45,6 @@ function createContextualWorkflowRunner<
|
||||
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
|
||||
> &
|
||||
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride> {
|
||||
if (!container) {
|
||||
container = MedusaModule.getLoadedModules().map(
|
||||
(mod) => Object.values(mod)[0]
|
||||
)
|
||||
}
|
||||
|
||||
const flow = new LocalWorkflow(workflowId, container!)
|
||||
|
||||
const originalRun = flow.run.bind(flow)
|
||||
@@ -187,9 +54,24 @@ function createContextualWorkflowRunner<
|
||||
|
||||
const originalExecution = async (
|
||||
method,
|
||||
{ throwOnError, resultFrom, isCancel = false },
|
||||
{
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
isCancel = false,
|
||||
container: executionContainer,
|
||||
},
|
||||
...args
|
||||
) => {
|
||||
if (!executionContainer && !flow.container) {
|
||||
executionContainer = MedusaModule.getLoadedModules().map(
|
||||
(mod) => Object.values(mod)[0]
|
||||
)
|
||||
}
|
||||
|
||||
if (!flow.container) {
|
||||
flow.container = executionContainer
|
||||
}
|
||||
|
||||
const transaction = await method.apply(method, args)
|
||||
|
||||
let errors = transaction.getErrors(TransactionHandlerType.INVOKE)
|
||||
@@ -236,6 +118,7 @@ function createContextualWorkflowRunner<
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
}: FlowRunOptions = {
|
||||
throwOnError: true,
|
||||
resultFrom: defaultResult,
|
||||
@@ -269,7 +152,7 @@ function createContextualWorkflowRunner<
|
||||
|
||||
return await originalExecution(
|
||||
originalRun,
|
||||
{ throwOnError, resultFrom },
|
||||
{ throwOnError, resultFrom, container },
|
||||
context.transactionId,
|
||||
input,
|
||||
context,
|
||||
@@ -286,6 +169,7 @@ function createContextualWorkflowRunner<
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
}: FlowRegisterStepSuccessOptions = {
|
||||
idempotencyKey: "",
|
||||
throwOnError: true,
|
||||
@@ -304,7 +188,7 @@ function createContextualWorkflowRunner<
|
||||
|
||||
return await originalExecution(
|
||||
originalRegisterStepSuccess,
|
||||
{ throwOnError, resultFrom },
|
||||
{ throwOnError, resultFrom, container },
|
||||
idempotencyKey,
|
||||
response,
|
||||
context,
|
||||
@@ -321,6 +205,7 @@ function createContextualWorkflowRunner<
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
}: FlowRegisterStepFailureOptions = {
|
||||
idempotencyKey: "",
|
||||
throwOnError: true,
|
||||
@@ -339,7 +224,7 @@ function createContextualWorkflowRunner<
|
||||
|
||||
return await originalExecution(
|
||||
originalRegisterStepFailure,
|
||||
{ throwOnError, resultFrom },
|
||||
{ throwOnError, resultFrom, container },
|
||||
idempotencyKey,
|
||||
response,
|
||||
context,
|
||||
@@ -355,6 +240,7 @@ function createContextualWorkflowRunner<
|
||||
context: outerContext,
|
||||
throwOnError,
|
||||
events,
|
||||
container,
|
||||
}: FlowCancelOptions = {
|
||||
throwOnError: true,
|
||||
}
|
||||
@@ -373,6 +259,7 @@ function createContextualWorkflowRunner<
|
||||
throwOnError,
|
||||
resultFrom: undefined,
|
||||
isCancel: true,
|
||||
container,
|
||||
},
|
||||
transaction ?? transactionId,
|
||||
context,
|
||||
@@ -397,6 +284,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
|
||||
TDataOverride = undefined,
|
||||
TResultOverride = undefined
|
||||
>(
|
||||
// TODO: rm when all usage have been migrated
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
): Omit<
|
||||
LocalWorkflow,
|
||||
@@ -456,9 +344,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
|
||||
>(
|
||||
args?: FlowRunOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
> & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
>
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
@@ -482,9 +368,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
|
||||
>(
|
||||
args?: FlowRegisterStepSuccessOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
> & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
>
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
@@ -512,9 +396,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
|
||||
>(
|
||||
args?: FlowRegisterStepFailureOptions<
|
||||
TDataOverride extends undefined ? TData : TDataOverride
|
||||
> & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
>
|
||||
): Promise<
|
||||
WorkflowResult<
|
||||
TResultOverride extends undefined ? TResult : TResultOverride
|
||||
@@ -537,9 +419,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
|
||||
}
|
||||
|
||||
exportedWorkflow.cancel = async (
|
||||
args?: FlowCancelOptions & {
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
args?: FlowCancelOptions
|
||||
): Promise<WorkflowResult> => {
|
||||
const container = args?.container
|
||||
delete args?.container
|
||||
@@ -552,5 +432,5 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
|
||||
}
|
||||
|
||||
MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)
|
||||
return exportedWorkflow
|
||||
return exportedWorkflow as MainExportedWorkflow<TData, TResult>
|
||||
}
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
import { createStep, createWorkflow, StepResponse } from "./composer"
|
||||
import {
|
||||
createStep,
|
||||
createWorkflow,
|
||||
StepResponse,
|
||||
WorkflowData,
|
||||
} from "./composer"
|
||||
|
||||
const step1 = createStep("step1", async (input: {}, context) => {
|
||||
return new StepResponse({ step1: ["step1"] })
|
||||
return new StepResponse({ step1: "step1" })
|
||||
})
|
||||
|
||||
type Step2Input = { filters: { id: string[] } } | { filters: { id: string } }
|
||||
@@ -13,17 +18,22 @@ const step3 = createStep("step3", async () => {
|
||||
return new StepResponse({ step3: "step3" })
|
||||
})
|
||||
|
||||
const workflow = createWorkflow("workflow", function () {
|
||||
const step1Res = step1()
|
||||
step3()
|
||||
return step2({ filters: { id: step1Res.step1 } })
|
||||
})
|
||||
const workflow = createWorkflow(
|
||||
"sub-workflow",
|
||||
function (input: WorkflowData<{ outsideWorkflowData: string }>) {
|
||||
step1()
|
||||
step3()
|
||||
return step2({ filters: { id: input.outsideWorkflowData } })
|
||||
}
|
||||
)
|
||||
|
||||
const workflow2 = createWorkflow("workflow", function () {
|
||||
const step1Res = step1()
|
||||
step3()
|
||||
workflow()
|
||||
return step2({ filters: { id: step1Res.step1 } })
|
||||
|
||||
const workflowRes = workflow.asStep({ outsideWorkflowData: step1Res.step1 })
|
||||
|
||||
return workflowRes
|
||||
})
|
||||
|
||||
workflow2()
|
||||
|
||||
@@ -2,8 +2,109 @@ import { createStep } from "../create-step"
|
||||
import { createWorkflow } from "../create-workflow"
|
||||
import { StepResponse } from "../helpers"
|
||||
import { transform } from "../transform"
|
||||
import { WorkflowData } from "../type"
|
||||
|
||||
let count = 1
|
||||
const getNewWorkflowId = () => `workflow-${count++}`
|
||||
|
||||
describe("Workflow composer", () => {
|
||||
describe("running sub workflows", () => {
|
||||
it("should succeed", async function () {
|
||||
const step1 = createStep("step1", async () => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
})
|
||||
const step2 = createStep("step2", async (input: string) => {
|
||||
return new StepResponse({ result: input })
|
||||
})
|
||||
const step3 = createStep("step3", async (input: string) => {
|
||||
return new StepResponse({ result: input })
|
||||
})
|
||||
|
||||
const subWorkflow = createWorkflow(
|
||||
getNewWorkflowId(),
|
||||
function (input: WorkflowData<string>) {
|
||||
step1()
|
||||
return step2(input)
|
||||
}
|
||||
)
|
||||
|
||||
const workflow = createWorkflow(getNewWorkflowId(), function () {
|
||||
const subWorkflowRes = subWorkflow.runAsStep({
|
||||
input: "hi from outside",
|
||||
})
|
||||
return step3(subWorkflowRes.result)
|
||||
})
|
||||
|
||||
const { result } = await workflow.run({ input: {} })
|
||||
|
||||
expect(result).toEqual({ result: "hi from outside" })
|
||||
})
|
||||
|
||||
it("should revert the workflow and sub workflow on failure", async function () {
|
||||
const step1Mock = jest.fn()
|
||||
const step1 = createStep(
|
||||
"step1",
|
||||
async () => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
},
|
||||
step1Mock
|
||||
)
|
||||
|
||||
const step2Mock = jest.fn()
|
||||
const step2 = createStep(
|
||||
"step2",
|
||||
async (input: string) => {
|
||||
return new StepResponse({ result: input })
|
||||
},
|
||||
step2Mock
|
||||
)
|
||||
|
||||
const step3Mock = jest.fn()
|
||||
const step3 = createStep(
|
||||
"step3",
|
||||
async () => {
|
||||
return new StepResponse()
|
||||
},
|
||||
step3Mock
|
||||
)
|
||||
|
||||
const step4WithError = createStep("step4", async () => {
|
||||
throw new Error("Step4 failed")
|
||||
})
|
||||
|
||||
const subWorkflow = createWorkflow(
|
||||
getNewWorkflowId(),
|
||||
function (input: WorkflowData<string>) {
|
||||
step1()
|
||||
return step2(input)
|
||||
}
|
||||
)
|
||||
|
||||
const workflow = createWorkflow(getNewWorkflowId(), function () {
|
||||
step3()
|
||||
const subWorkflowRes = subWorkflow.runAsStep({
|
||||
input: "hi from outside",
|
||||
})
|
||||
step4WithError()
|
||||
return subWorkflowRes
|
||||
})
|
||||
|
||||
const { errors } = await workflow.run({ throwOnError: false })
|
||||
|
||||
expect(errors).toEqual([
|
||||
expect.objectContaining({
|
||||
error: expect.objectContaining({
|
||||
message: "Step4 failed",
|
||||
}),
|
||||
}),
|
||||
])
|
||||
|
||||
expect(step1Mock).toHaveBeenCalledTimes(1)
|
||||
expect(step2Mock).toHaveBeenCalledTimes(1)
|
||||
expect(step3Mock).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
})
|
||||
|
||||
it("should not throw an unhandled error on failed transformer resolution after a step fail, but should rather push the errors in the errors result", async function () {
|
||||
const step1 = createStep("step1", async () => {
|
||||
return new StepResponse({ result: "step1" })
|
||||
@@ -25,7 +126,7 @@ describe("Workflow composer", () => {
|
||||
})
|
||||
})
|
||||
|
||||
const { errors } = await work().run({ input: {}, throwOnError: false })
|
||||
const { errors } = await work.run({ input: {}, throwOnError: false })
|
||||
|
||||
expect(errors).toEqual([
|
||||
{
|
||||
|
||||
@@ -1,96 +1,24 @@
|
||||
import {
|
||||
LocalWorkflow,
|
||||
TransactionModelOptions,
|
||||
WorkflowHandler,
|
||||
WorkflowManager,
|
||||
} from "@medusajs/orchestration"
|
||||
import { LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
import { isString, OrchestrationUtils } from "@medusajs/utils"
|
||||
import { ExportedWorkflow, exportWorkflow } from "../../helper"
|
||||
import { exportWorkflow } from "../../helper"
|
||||
import { proxify } from "./helpers/proxy"
|
||||
import {
|
||||
CreateWorkflowComposerContext,
|
||||
ReturnWorkflow,
|
||||
StepFunction,
|
||||
WorkflowData,
|
||||
WorkflowDataProperties,
|
||||
} from "./type"
|
||||
import { createStep } from "./create-step"
|
||||
import { StepResponse } from "./helpers"
|
||||
|
||||
global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
|
||||
|
||||
/**
|
||||
* An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create
|
||||
* an executable workflow, optionally within a specified container. So, to execute the workflow, you must invoke the exported workflow, then run the
|
||||
* `run` method of the exported workflow.
|
||||
*
|
||||
* @example
|
||||
* To execute a workflow:
|
||||
*
|
||||
* ```ts
|
||||
* myWorkflow()
|
||||
* .run({
|
||||
* input: {
|
||||
* name: "John"
|
||||
* }
|
||||
* })
|
||||
* .then(({ result }) => {
|
||||
* console.log(result)
|
||||
* })
|
||||
* ```
|
||||
*
|
||||
* To specify the container of the workflow, you can pass it as an argument to the call of the exported workflow. This is necessary when executing the workflow
|
||||
* within a Medusa resource such as an API Route or a Subscriber.
|
||||
*
|
||||
* For example:
|
||||
*
|
||||
* ```ts
|
||||
* import type {
|
||||
* MedusaRequest,
|
||||
* MedusaResponse
|
||||
* } from "@medusajs/medusa";
|
||||
* import myWorkflow from "../../../workflows/hello-world";
|
||||
*
|
||||
* export async function GET(
|
||||
* req: MedusaRequest,
|
||||
* res: MedusaResponse
|
||||
* ) {
|
||||
* const { result } = await myWorkflow(req.scope)
|
||||
* .run({
|
||||
* input: {
|
||||
* name: req.query.name as string
|
||||
* }
|
||||
* })
|
||||
*
|
||||
* res.send(result)
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export type ReturnWorkflow<
|
||||
TData,
|
||||
TResult,
|
||||
THooks extends Record<string, Function>
|
||||
> = {
|
||||
<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
): Omit<
|
||||
LocalWorkflow,
|
||||
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
|
||||
> &
|
||||
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
|
||||
} & THooks & {
|
||||
getName: () => string
|
||||
} & {
|
||||
config: (config: TransactionModelOptions) => void
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the raw type of the expected input data of a workflow.
|
||||
*
|
||||
* @example
|
||||
* type WorkflowInputData = UnwrapWorkflowInputDataType<typeof myWorkflow>
|
||||
*/
|
||||
export type UnwrapWorkflowInputDataType<
|
||||
T extends ReturnWorkflow<any, any, any>
|
||||
> = T extends ReturnWorkflow<infer TData, infer R, infer THooks> ? TData : never
|
||||
|
||||
/**
|
||||
* This function creates a workflow with the provided name and a constructor function.
|
||||
* The constructor function builds the workflow from steps created by the {@link createStep} function.
|
||||
@@ -256,5 +184,38 @@ export function createWorkflow<
|
||||
|
||||
mainFlow.getName = () => name
|
||||
|
||||
mainFlow.run = mainFlow().run
|
||||
|
||||
mainFlow.runAsStep = ({
|
||||
input,
|
||||
}: {
|
||||
input: TData
|
||||
}): ReturnType<StepFunction<TData, TResult>> => {
|
||||
// TODO: Async sub workflow is not supported yet
|
||||
// Info: Once the export workflow can fire the execution through the engine if loaded, the async workflow can be executed,
|
||||
// the step would inherit the async configuration and subscribe to the onFinish event of the sub worklow and mark itself as success or failure
|
||||
return createStep(
|
||||
`${name}-as-step`,
|
||||
async (stepInput: TData, stepContext) => {
|
||||
const { container, ...sharedContext } = stepContext
|
||||
|
||||
const transaction = await workflow.run({
|
||||
input: stepInput as any,
|
||||
container,
|
||||
context: sharedContext,
|
||||
})
|
||||
|
||||
return new StepResponse(transaction.result, transaction)
|
||||
},
|
||||
async (transaction, { container }) => {
|
||||
if (!transaction) {
|
||||
return
|
||||
}
|
||||
|
||||
await workflow(container).cancel(transaction)
|
||||
}
|
||||
)(input) as ReturnType<StepFunction<TData, TResult>>
|
||||
}
|
||||
|
||||
return mainFlow as ReturnWorkflow<TData, TResult, THooks>
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { PermanentStepFailureError } from "@medusajs/orchestration"
|
||||
import { OrchestrationUtils } from "@medusajs/utils"
|
||||
import { isDefined, OrchestrationUtils } from "@medusajs/utils"
|
||||
|
||||
/**
|
||||
* This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`.
|
||||
@@ -26,13 +26,15 @@ export class StepResponse<TOutput, TCompensateInput = TOutput> {
|
||||
/**
|
||||
* The output of the step.
|
||||
*/
|
||||
output: TOutput,
|
||||
output?: TOutput,
|
||||
/**
|
||||
* The input to be passed as a parameter to the step's compensation function. If not provided, the `output` will be provided instead.
|
||||
*/
|
||||
compensateInput?: TCompensateInput
|
||||
) {
|
||||
this.#output = output
|
||||
if (isDefined(output)) {
|
||||
this.#output = output
|
||||
}
|
||||
this.#compensateInput = (compensateInput ?? output) as TCompensateInput
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
import {
|
||||
LocalWorkflow,
|
||||
OrchestratorBuilder,
|
||||
TransactionContext as OriginalWorkflowTransactionContext,
|
||||
TransactionModelOptions,
|
||||
TransactionPayload,
|
||||
TransactionStepsDefinition,
|
||||
WorkflowHandler,
|
||||
} from "@medusajs/orchestration"
|
||||
import { Context, MedusaContainer } from "@medusajs/types"
|
||||
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
import { ExportedWorkflow } from "../../helper"
|
||||
|
||||
export type StepFunctionResult<TOutput extends unknown | unknown[] = unknown> =
|
||||
(this: CreateWorkflowComposerContext) => WorkflowData<TOutput>
|
||||
@@ -134,3 +137,89 @@ export type WorkflowTransactionContext = StepExecutionContext &
|
||||
OriginalWorkflowTransactionContext & {
|
||||
invoke: { [key: string]: { output: any } }
|
||||
}
|
||||
|
||||
/**
|
||||
* An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create
|
||||
* an executable workflow, optionally within a specified container. So, to execute the workflow, you must invoke the exported workflow, then run the
|
||||
* `run` method of the exported workflow.
|
||||
*
|
||||
* @example
|
||||
* To execute a workflow:
|
||||
*
|
||||
* ```ts
|
||||
* myWorkflow()
|
||||
* .run({
|
||||
* input: {
|
||||
* name: "John"
|
||||
* }
|
||||
* })
|
||||
* .then(({ result }) => {
|
||||
* console.log(result)
|
||||
* })
|
||||
* ```
|
||||
*
|
||||
* To specify the container of the workflow, you can pass it as an argument to the call of the exported workflow. This is necessary when executing the workflow
|
||||
* within a Medusa resource such as an API Route or a Subscriber.
|
||||
*
|
||||
* For example:
|
||||
*
|
||||
* ```ts
|
||||
* import type {
|
||||
* MedusaRequest,
|
||||
* MedusaResponse
|
||||
* } from "@medusajs/medusa";
|
||||
* import myWorkflow from "../../../workflows/hello-world";
|
||||
*
|
||||
* export async function GET(
|
||||
* req: MedusaRequest,
|
||||
* res: MedusaResponse
|
||||
* ) {
|
||||
* const { result } = await myWorkflow(req.scope)
|
||||
* .run({
|
||||
* input: {
|
||||
* name: req.query.name as string
|
||||
* }
|
||||
* })
|
||||
*
|
||||
* res.send(result)
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export type ReturnWorkflow<
|
||||
TData,
|
||||
TResult,
|
||||
THooks extends Record<string, Function>
|
||||
> = {
|
||||
<TDataOverride = undefined, TResultOverride = undefined>(
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
): Omit<
|
||||
LocalWorkflow,
|
||||
"run" | "registerStepSuccess" | "registerStepFailure" | "cancel"
|
||||
> &
|
||||
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
|
||||
} & THooks & {
|
||||
runAsStep: ({
|
||||
input,
|
||||
}: {
|
||||
input: TData
|
||||
}) => ReturnType<StepFunction<TData, TResult>>
|
||||
run: <TDataOverride = undefined, TResultOverride = undefined>(
|
||||
...args: Parameters<
|
||||
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>["run"]
|
||||
>
|
||||
) => ReturnType<
|
||||
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>["run"]
|
||||
>
|
||||
getName: () => string
|
||||
config: (config: TransactionModelOptions) => void
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the raw type of the expected input data of a workflow.
|
||||
*
|
||||
* @example
|
||||
* type WorkflowInputData = UnwrapWorkflowInputDataType<typeof myWorkflow>
|
||||
*/
|
||||
export type UnwrapWorkflowInputDataType<
|
||||
T extends ReturnWorkflow<any, any, any>
|
||||
> = T extends ReturnWorkflow<infer TData, infer R, infer THooks> ? TData : never
|
||||
|
||||
Reference in New Issue
Block a user