feat(workflows-sdk): run, registerStepSuccess and registerStepFailure shortcut (#6164)

**What**

Currently, when exporting a workflow using the workflowExport util, it is mandatory to first call the resulted function passing the container before being able to call run, registerStepSuccess or failure on it. Now, it is possible to either continue that way, or to directly call the run, registerStepSuccess or failure on the exported workflow and at that moment it is possible to pass a container if needed

e.g
```ts
const workflow = exportWorkflow("id" as any, "result_step", prepare)
const wfRunner = workflow(container)
wfRunner.run(...) // Here the container is not expected
```
or
```ts
const workflow = exportWorkflow("id" as any, "result_step", prepare)
workflow.run(...) // Here we can now pass an optional container 
```
This commit is contained in:
Adrien de Peretti
2024-01-22 20:14:18 +01:00
committed by GitHub
parent 4792c55226
commit 738e9115ec
3 changed files with 375 additions and 131 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/workflows-sdk": patch
---
feat(workflows-sdk): run, registerStepSuccess and registerStepFailure shortcut

View File

@@ -1,4 +1,5 @@
import { exportWorkflow } from "../workflow-export"
import { createMedusaContainer } from "@medusajs/utils"
jest.mock("@medusajs/orchestration", () => {
return {
@@ -62,7 +63,8 @@ describe("Export Workflow", function () {
const work = exportWorkflow("id" as any, "result_step", prepare)
const wfHandler = work()
const container = createMedusaContainer()
const wfHandler = work(container)
const input = {
test: "payload",
@@ -83,4 +85,40 @@ describe("Export Workflow", function () {
expect(result).toEqual("invoke_test")
})
describe("Using the exported workflow run", function () {
it("should prepare the input data before initializing the transaction", async function () {
let transformedInput
const prepare = jest.fn().mockImplementation(async (data) => {
data.__transformed = true
transformedInput = data
return data
})
const work = exportWorkflow("id" as any, "result_step", prepare)
const input = {
test: "payload",
}
const container = createMedusaContainer()
const { result } = await work.run({
input,
container,
})
expect(input).toEqual({
test: "payload",
})
expect(transformedInput).toEqual({
test: "payload",
__transformed: true,
})
expect(result).toEqual("invoke_test")
})
})
})

View File

@@ -81,6 +81,217 @@ export type ExportedWorkflow<
>
}
export type MainExportedWorkflow<TData = unknown, TResult = unknown> = {
// Main function on the exported workflow
<TDataOverride = undefined, TResultOverride = undefined>(
container?: LoadedModule[] | MedusaContainer
): Omit<
LocalWorkflow,
"run" | "registerStepSuccess" | "registerStepFailure"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
/**
* You can also directly call run, registerStepSuccess and registerStepFailure on the exported workflow
*/
run<TDataOverride = undefined, TResultOverride = undefined>(
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
> & {
container?: LoadedModule[] | MedusaContainer
}
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
registerStepSuccess<TDataOverride = undefined, TResultOverride = undefined>(
args?: FlowRegisterStepSuccessOptions<
TDataOverride extends undefined ? TData : TDataOverride
> & {
container?: LoadedModule[] | MedusaContainer
}
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
registerStepFailure<TDataOverride = undefined, TResultOverride = undefined>(
args?: FlowRegisterStepFailureOptions<
TDataOverride extends undefined ? TData : TDataOverride
> & {
container?: LoadedModule[] | MedusaContainer
}
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
>
}
function createContextualWorkflowRunner<
TData = unknown,
TResult = unknown,
TDataOverride = undefined,
TResultOverride = undefined
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
}: {
workflowId: string
defaultResult?: string | Symbol
dataPreparation?: (data: TData) => Promise<unknown>
options?: {
wrappedInput?: boolean
}
container?: LoadedModule[] | MedusaContainer
}): Omit<LocalWorkflow, "run" | "registerStepSuccess" | "registerStepFailure"> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride> {
if (!container) {
container = MedusaModule.getLoadedModules().map(
(mod) => Object.values(mod)[0]
)
}
const flow = new LocalWorkflow(workflowId, container)
const originalRun = flow.run.bind(flow)
const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow)
const originalRegisterStepFailure = flow.registerStepFailure.bind(flow)
const originalExecution = async (
method,
{ throwOnError, resultFrom },
...args
) => {
const transaction = await method.apply(method, args)
const errors = transaction.getErrors(TransactionHandlerType.INVOKE)
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
if (failedStatus.includes(transaction.getState()) && throwOnError) {
const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)
throw new Error(errorMessage)
}
let result
if (options?.wrappedInput) {
result = await resolveValue(resultFrom, transaction.getContext())
} else {
result = transaction.getContext().invoke?.[resultFrom]
}
return {
errors,
transaction,
result,
}
}
const newRun = async (
{ input, context, throwOnError, resultFrom, events }: FlowRunOptions = {
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
if (typeof dataPreparation === "function") {
try {
const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}
return await originalExecution(
originalRun,
{ throwOnError, resultFrom },
context?.transactionId ?? ulid(),
input,
context,
events
)
}
flow.run = newRun as any
const newRegisterStepSuccess = async (
{
response,
idempotencyKey,
context,
throwOnError,
resultFrom,
events,
}: FlowRegisterStepSuccessOptions = {
idempotencyKey: "",
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
return await originalExecution(
originalRegisterStepSuccess,
{ throwOnError, resultFrom },
idempotencyKey,
response,
context,
events
)
}
flow.registerStepSuccess = newRegisterStepSuccess as any
const newRegisterStepFailure = async (
{
response,
idempotencyKey,
context,
throwOnError,
resultFrom,
events,
}: FlowRegisterStepFailureOptions = {
idempotencyKey: "",
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
return await originalExecution(
originalRegisterStepFailure,
{ throwOnError, resultFrom },
idempotencyKey,
response,
context,
events
)
}
flow.registerStepFailure = newRegisterStepFailure as any
return flow as unknown as LocalWorkflow &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
}
export const exportWorkflow = <TData = unknown, TResult = unknown>(
workflowId: string,
defaultResult?: string | Symbol,
@@ -88,7 +299,7 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
options?: {
wrappedInput?: boolean
}
) => {
): MainExportedWorkflow<TData, TResult> => {
function exportedWorkflow<
TDataOverride = undefined,
TResultOverride = undefined
@@ -99,143 +310,133 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
"run" | "registerStepSuccess" | "registerStepFailure"
> &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride> {
if (!container) {
container = MedusaModule.getLoadedModules().map(
(mod) => Object.values(mod)[0]
)
return createContextualWorkflowRunner<
TData,
TResult,
TDataOverride,
TResultOverride
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
}
const buildRunnerFn = <
TAction extends "run" | "registerStepSuccess" | "registerStepFailure",
TDataOverride,
TResultOverride
>(
action: "run" | "registerStepSuccess" | "registerStepFailure",
container?: LoadedModule[] | MedusaContainer
) => {
const contextualRunner = createContextualWorkflowRunner<
TData,
TResult,
TDataOverride,
TResultOverride
>({
workflowId,
defaultResult,
dataPreparation,
options,
container,
})
return contextualRunner[action] as ExportedWorkflow<
TData,
TResult,
TDataOverride,
TResultOverride
>[TAction]
}
exportedWorkflow.run = async <
TDataOverride = undefined,
TResultOverride = undefined
>(
args?: FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
> & {
container?: LoadedModule[] | MedusaContainer
}
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowRunOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
const flow = new LocalWorkflow(workflowId, container)
return await buildRunnerFn<"run", TDataOverride, TResultOverride>(
"run",
container
)(inputArgs)
}
const originalRun = flow.run.bind(flow)
const originalRegisterStepSuccess = flow.registerStepSuccess.bind(flow)
const originalRegisterStepFailure = flow.registerStepFailure.bind(flow)
const originalExecution = async (
method,
{ throwOnError, resultFrom },
...args
) => {
const transaction = await method.apply(method, args)
const errors = transaction.getErrors(TransactionHandlerType.INVOKE)
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
if (failedStatus.includes(transaction.getState()) && throwOnError) {
const errorMessage = errors
?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`)
?.join(`${EOL}`)
throw new Error(errorMessage)
}
let result
if (options?.wrappedInput) {
result = await resolveValue(resultFrom, transaction.getContext())
} else {
result = transaction.getContext().invoke?.[resultFrom]
}
return {
errors,
transaction,
result,
}
exportedWorkflow.registerStepSuccess = async <
TDataOverride = undefined,
TResultOverride = undefined
>(
args?: FlowRegisterStepSuccessOptions<
TDataOverride extends undefined ? TData : TDataOverride
> & {
container?: LoadedModule[] | MedusaContainer
}
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowRegisterStepSuccessOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
const newRun = async (
{ input, context, throwOnError, resultFrom, events }: FlowRunOptions = {
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
return await buildRunnerFn<
"registerStepSuccess",
TDataOverride,
TResultOverride
>(
"registerStepSuccess",
container
)(inputArgs)
}
if (typeof dataPreparation === "function") {
try {
const copyInput = input ? JSON.parse(JSON.stringify(input)) : input
input = await dataPreparation(copyInput as TData)
} catch (err) {
if (throwOnError) {
throw new Error(
`Data preparation failed: ${err.message}${EOL}${err.stack}`
)
}
return {
errors: [err],
}
}
}
return await originalExecution(
originalRun,
{ throwOnError, resultFrom },
context?.transactionId ?? ulid(),
input,
context,
events
)
exportedWorkflow.registerStepFailure = async <
TDataOverride = undefined,
TResultOverride = undefined
>(
args?: FlowRegisterStepFailureOptions<
TDataOverride extends undefined ? TData : TDataOverride
> & {
container?: LoadedModule[] | MedusaContainer
}
flow.run = newRun as any
): Promise<
WorkflowResult<
TResultOverride extends undefined ? TResult : TResultOverride
>
> => {
const container = args?.container
delete args?.container
const inputArgs = { ...args } as FlowRegisterStepFailureOptions<
TDataOverride extends undefined ? TData : TDataOverride
>
const newRegisterStepSuccess = async (
{
response,
idempotencyKey,
context,
throwOnError,
resultFrom,
events,
}: FlowRegisterStepSuccessOptions = {
idempotencyKey: "",
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
return await originalExecution(
originalRegisterStepSuccess,
{ throwOnError, resultFrom },
idempotencyKey,
response,
context,
events
)
}
flow.registerStepSuccess = newRegisterStepSuccess as any
const newRegisterStepFailure = async (
{
response,
idempotencyKey,
context,
throwOnError,
resultFrom,
events,
}: FlowRegisterStepFailureOptions = {
idempotencyKey: "",
throwOnError: true,
resultFrom: defaultResult,
}
) => {
resultFrom ??= defaultResult
throwOnError ??= true
return await originalExecution(
originalRegisterStepFailure,
{ throwOnError, resultFrom },
idempotencyKey,
response,
context,
events
)
}
flow.registerStepFailure = newRegisterStepFailure as any
return flow as unknown as LocalWorkflow &
ExportedWorkflow<TData, TResult, TDataOverride, TResultOverride>
return await buildRunnerFn<
"registerStepFailure",
TDataOverride,
TResultOverride
>(
"registerStepFailure",
container
)(inputArgs)
}
MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)