chore(workflow-engine): export cancel method (#11844)
What: * Workflow engine exports the method `cancel` to revert a workflow.
This commit is contained in:
committed by
GitHub
parent
3db146c56e
commit
0625f76cd4
@@ -0,0 +1,8 @@
|
||||
---
|
||||
"@medusajs/workflow-engine-inmemory": patch
|
||||
"@medusajs/workflow-engine-redis": patch
|
||||
"@medusajs/orchestration": patch
|
||||
"@medusajs/types": patch
|
||||
---
|
||||
|
||||
chore(workflow-engine): expose cancel method
|
||||
@@ -20,6 +20,7 @@ import {
|
||||
import {
|
||||
isDefined,
|
||||
isErrorLike,
|
||||
isObject,
|
||||
MedusaError,
|
||||
promiseAll,
|
||||
serializeError,
|
||||
@@ -188,6 +189,7 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
TransactionStepState.DORMANT,
|
||||
TransactionStepState.SKIPPED,
|
||||
]
|
||||
|
||||
const siblings = step.next.map((sib) => flow.steps[sib])
|
||||
return (
|
||||
siblings.length === 0 ||
|
||||
@@ -1208,70 +1210,72 @@ export class TransactionOrchestrator extends EventEmitter {
|
||||
while (queue.length > 0) {
|
||||
const { obj, level } = queue.shift()
|
||||
|
||||
for (const key of Object.keys(obj)) {
|
||||
if (typeof obj[key] === "object" && obj[key] !== null) {
|
||||
queue.push({ obj: obj[key], level: [...level] })
|
||||
} else if (key === "action") {
|
||||
if (actionNames.has(obj.action)) {
|
||||
throw new Error(
|
||||
`Step ${obj.action} is already defined in workflow.`
|
||||
)
|
||||
}
|
||||
|
||||
actionNames.add(obj.action)
|
||||
level.push(obj.action)
|
||||
const id = level.join(".")
|
||||
const parent = level.slice(0, level.length - 1).join(".")
|
||||
|
||||
if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) {
|
||||
states[parent].next?.push(id)
|
||||
}
|
||||
|
||||
const definitionCopy = { ...obj }
|
||||
delete definitionCopy.next
|
||||
|
||||
if (definitionCopy.async) {
|
||||
features.hasAsyncSteps = true
|
||||
}
|
||||
|
||||
if (definitionCopy.timeout) {
|
||||
features.hasStepTimeouts = true
|
||||
}
|
||||
|
||||
if (
|
||||
definitionCopy.retryInterval ||
|
||||
definitionCopy.retryIntervalAwaiting
|
||||
) {
|
||||
features.hasRetriesTimeout = true
|
||||
}
|
||||
|
||||
if (definitionCopy.nested) {
|
||||
features.hasNestedTransactions = true
|
||||
}
|
||||
|
||||
states[id] = Object.assign(
|
||||
new TransactionStep(),
|
||||
existingSteps?.[id] || {
|
||||
id,
|
||||
uuid: definitionCopy.uuid,
|
||||
depth: level.length - 1,
|
||||
definition: definitionCopy,
|
||||
saveResponse: definitionCopy.saveResponse ?? true,
|
||||
invoke: {
|
||||
state: TransactionStepState.NOT_STARTED,
|
||||
status: TransactionStepStatus.IDLE,
|
||||
},
|
||||
compensate: {
|
||||
state: TransactionStepState.DORMANT,
|
||||
status: TransactionStepStatus.IDLE,
|
||||
},
|
||||
attempts: 0,
|
||||
failures: 0,
|
||||
lastAttempt: null,
|
||||
next: [],
|
||||
}
|
||||
)
|
||||
if (obj.action) {
|
||||
if (actionNames.has(obj.action)) {
|
||||
throw new Error(`Step ${obj.action} is already defined in workflow.`)
|
||||
}
|
||||
|
||||
actionNames.add(obj.action)
|
||||
level.push(obj.action)
|
||||
const id = level.join(".")
|
||||
const parent = level.slice(0, level.length - 1).join(".")
|
||||
|
||||
if (!existingSteps || parent === TransactionOrchestrator.ROOT_STEP) {
|
||||
states[parent].next?.push(id)
|
||||
}
|
||||
|
||||
const definitionCopy = { ...obj }
|
||||
delete definitionCopy.next
|
||||
|
||||
if (definitionCopy.async) {
|
||||
features.hasAsyncSteps = true
|
||||
}
|
||||
|
||||
if (definitionCopy.timeout) {
|
||||
features.hasStepTimeouts = true
|
||||
}
|
||||
|
||||
if (
|
||||
definitionCopy.retryInterval ||
|
||||
definitionCopy.retryIntervalAwaiting
|
||||
) {
|
||||
features.hasRetriesTimeout = true
|
||||
}
|
||||
|
||||
if (definitionCopy.nested) {
|
||||
features.hasNestedTransactions = true
|
||||
}
|
||||
|
||||
states[id] = Object.assign(
|
||||
new TransactionStep(),
|
||||
existingSteps?.[id] || {
|
||||
id,
|
||||
uuid: definitionCopy.uuid,
|
||||
depth: level.length - 1,
|
||||
definition: definitionCopy,
|
||||
saveResponse: definitionCopy.saveResponse ?? true,
|
||||
invoke: {
|
||||
state: TransactionStepState.NOT_STARTED,
|
||||
status: TransactionStepStatus.IDLE,
|
||||
},
|
||||
compensate: {
|
||||
state: TransactionStepState.DORMANT,
|
||||
status: TransactionStepStatus.IDLE,
|
||||
},
|
||||
attempts: 0,
|
||||
failures: 0,
|
||||
lastAttempt: null,
|
||||
next: [],
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
if (Array.isArray(obj.next)) {
|
||||
for (const next of obj.next) {
|
||||
queue.push({ obj: next, level: [...level] })
|
||||
}
|
||||
} else if (isObject(obj.next)) {
|
||||
queue.push({ obj: obj.next, level: [...level] })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { FindConfig } from "../common"
|
||||
import { IModuleService } from "../modules-sdk"
|
||||
import { ContainerLike, IModuleService } from "../modules-sdk"
|
||||
import { Context } from "../shared-context"
|
||||
import {
|
||||
FilterableWorkflowExecutionProps,
|
||||
@@ -28,6 +28,15 @@ export interface WorkflowOrchestratorRunDTO<T = unknown>
|
||||
transactionId?: string
|
||||
}
|
||||
|
||||
export interface WorkflowOrchestratorCancelOptionsDTO {
|
||||
transactionId: string
|
||||
context?: Context
|
||||
throwOnError?: boolean
|
||||
logOnError?: boolean
|
||||
events?: Record<string, Function>
|
||||
container?: ContainerLike
|
||||
}
|
||||
|
||||
export type IdempotencyKeyParts = {
|
||||
workflowId: string
|
||||
transactionId: string
|
||||
@@ -63,17 +72,11 @@ export interface IWorkflowEngineService extends IModuleService {
|
||||
workflowId: string,
|
||||
options?: WorkflowOrchestratorRunDTO,
|
||||
sharedContext?: Context
|
||||
): Promise<{
|
||||
errors: Error[]
|
||||
transaction: object
|
||||
result: any
|
||||
acknowledgement: Acknowledgement
|
||||
}>
|
||||
)
|
||||
|
||||
getRunningTransaction(
|
||||
workflowId: string,
|
||||
transactionId: string,
|
||||
options?: Record<string, any>,
|
||||
sharedContext?: Context
|
||||
): Promise<unknown>
|
||||
|
||||
@@ -121,4 +124,10 @@ export interface IWorkflowEngineService extends IModuleService {
|
||||
},
|
||||
sharedContext?: Context
|
||||
)
|
||||
|
||||
cancel(
|
||||
workflowId: string,
|
||||
options: WorkflowOrchestratorCancelOptionsDTO,
|
||||
sharedContext?: Context
|
||||
)
|
||||
}
|
||||
|
||||
@@ -4,4 +4,5 @@ export * from "./workflow_async"
|
||||
export * from "./workflow_conditional_step"
|
||||
export * from "./workflow_idempotent"
|
||||
export * from "./workflow_step_timeout"
|
||||
export * from "./workflow_sync"
|
||||
export * from "./workflow_transaction_timeout"
|
||||
|
||||
+64
@@ -0,0 +1,64 @@
|
||||
import {
|
||||
createStep,
|
||||
createWorkflow,
|
||||
StepResponse,
|
||||
WorkflowResponse,
|
||||
} from "@medusajs/framework/workflows-sdk"
|
||||
|
||||
const step_1 = createStep(
|
||||
"step_1",
|
||||
jest.fn((input) => {
|
||||
input.test = "test"
|
||||
return new StepResponse(input, { compensate: 123 })
|
||||
}),
|
||||
jest.fn((compensateInput) => {
|
||||
if (!compensateInput) {
|
||||
return
|
||||
}
|
||||
|
||||
return new StepResponse({
|
||||
reverted: true,
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
const step_2 = createStep(
|
||||
"step_2",
|
||||
jest.fn((input, context) => {
|
||||
if (input) {
|
||||
return new StepResponse({ notAsyncResponse: input.hey })
|
||||
}
|
||||
}),
|
||||
jest.fn((_, context) => {
|
||||
return new StepResponse({
|
||||
step: context.metadata.action,
|
||||
idempotency_key: context.metadata.idempotency_key,
|
||||
reverted: true,
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
const step_3 = createStep(
|
||||
"step_3",
|
||||
jest.fn((res) => {
|
||||
return new StepResponse({
|
||||
done: {
|
||||
inputFromSyncStep: res.notAsyncResponse,
|
||||
},
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
createWorkflow(
|
||||
{
|
||||
name: "workflow_sync",
|
||||
idempotent: true,
|
||||
},
|
||||
function (input) {
|
||||
step_1(input)
|
||||
|
||||
const ret2 = step_2({ hey: "oh" })
|
||||
|
||||
return new WorkflowResponse(step_3(ret2))
|
||||
}
|
||||
)
|
||||
@@ -300,6 +300,26 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
|
||||
expect(onFinish).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
|
||||
it("should cancel and revert a completed workflow", async () => {
|
||||
const workflowId = "workflow_sync"
|
||||
|
||||
const { acknowledgement, transaction: trx } =
|
||||
await workflowOrcModule.run(workflowId, {
|
||||
input: {
|
||||
value: "123",
|
||||
},
|
||||
})
|
||||
|
||||
expect(trx.getFlow().state).toEqual("done")
|
||||
expect(acknowledgement.hasFinished).toBe(true)
|
||||
|
||||
const { transaction } = await workflowOrcModule.cancel(workflowId, {
|
||||
transactionId: acknowledgement.transactionId,
|
||||
})
|
||||
|
||||
expect(transaction.getFlow().state).toEqual("reverted")
|
||||
})
|
||||
|
||||
it("should run conditional steps if condition is true", (done) => {
|
||||
void workflowOrcModule.subscribe({
|
||||
workflowId: "workflow_conditional_step",
|
||||
|
||||
@@ -6,7 +6,11 @@ import {
|
||||
TransactionStep,
|
||||
WorkflowScheduler,
|
||||
} from "@medusajs/framework/orchestration"
|
||||
import { ContainerLike, MedusaContainer } from "@medusajs/framework/types"
|
||||
import {
|
||||
ContainerLike,
|
||||
Context,
|
||||
MedusaContainer,
|
||||
} from "@medusajs/framework/types"
|
||||
import {
|
||||
isString,
|
||||
MedusaError,
|
||||
@@ -18,9 +22,9 @@ import {
|
||||
resolveValue,
|
||||
ReturnWorkflow,
|
||||
} from "@medusajs/framework/workflows-sdk"
|
||||
import { WorkflowOrchestratorCancelOptions } from "@types"
|
||||
import { ulid } from "ulid"
|
||||
import { InMemoryDistributedTransactionStorage } from "../utils"
|
||||
import { WorkflowOrchestratorCancelOptions } from "@types"
|
||||
|
||||
export type WorkflowOrchestratorRunOptions<T> = Omit<
|
||||
FlowRunOptions<T>,
|
||||
@@ -319,10 +323,8 @@ export class WorkflowOrchestratorService {
|
||||
async getRunningTransaction(
|
||||
workflowId: string,
|
||||
transactionId: string,
|
||||
options?: WorkflowOrchestratorRunOptions<unknown>
|
||||
context?: Context
|
||||
): Promise<DistributedTransactionType> {
|
||||
let { context, container } = options ?? {}
|
||||
|
||||
if (!workflowId) {
|
||||
throw new Error("Workflow ID is required")
|
||||
}
|
||||
@@ -339,9 +341,7 @@ export class WorkflowOrchestratorService {
|
||||
throw new Error(`Workflow with id "${workflowId}" not found.`)
|
||||
}
|
||||
|
||||
const flow = exportedWorkflow(
|
||||
(container as MedusaContainer) ?? this.container_
|
||||
)
|
||||
const flow = exportedWorkflow()
|
||||
|
||||
const transaction = await flow.getRunningTransaction(transactionId, context)
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import type {
|
||||
import { SqlEntityManager } from "@mikro-orm/postgresql"
|
||||
import { WorkflowExecution } from "@models"
|
||||
import { WorkflowOrchestratorService } from "@services"
|
||||
import { WorkflowOrchestratorCancelOptions } from "@types"
|
||||
|
||||
type InjectedDependencies = {
|
||||
manager: SqlEntityManager
|
||||
@@ -185,4 +186,16 @@ export class WorkflowsModuleService<
|
||||
updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time);
|
||||
`)
|
||||
}
|
||||
|
||||
@InjectSharedContext()
|
||||
async cancel<TWorkflow extends string | ReturnWorkflow<any, any, any>>(
|
||||
workflowIdOrWorkflow: TWorkflow,
|
||||
options: WorkflowOrchestratorCancelOptions,
|
||||
@MedusaContext() context: Context = {}
|
||||
) {
|
||||
return this.workflowOrchestratorService_.cancel(
|
||||
workflowIdOrWorkflow,
|
||||
options
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,8 @@ export type InitializeModuleInjectableDependencies = {
|
||||
|
||||
export type WorkflowOrchestratorCancelOptions = Omit<
|
||||
FlowCancelOptions,
|
||||
"transaction" | "container"
|
||||
"transaction" | "transactionId" | "container"
|
||||
> & {
|
||||
transactionId: string
|
||||
container?: ContainerLike
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
export * from "./workflow_1"
|
||||
export * from "./workflow_2"
|
||||
export * from "./workflow_async"
|
||||
export * from "./workflow_async_compensate"
|
||||
export * from "./workflow_step_timeout"
|
||||
export * from "./workflow_sync"
|
||||
export * from "./workflow_transaction_timeout"
|
||||
export * from "./workflow_when"
|
||||
export * from "./workflow_async_compensate"
|
||||
|
||||
+64
@@ -0,0 +1,64 @@
|
||||
import {
|
||||
createStep,
|
||||
createWorkflow,
|
||||
StepResponse,
|
||||
WorkflowResponse,
|
||||
} from "@medusajs/framework/workflows-sdk"
|
||||
|
||||
const step_1 = createStep(
|
||||
"step_1",
|
||||
jest.fn((input) => {
|
||||
input.test = "test"
|
||||
return new StepResponse(input, { compensate: 123 })
|
||||
}),
|
||||
jest.fn((compensateInput) => {
|
||||
if (!compensateInput) {
|
||||
return
|
||||
}
|
||||
|
||||
return new StepResponse({
|
||||
reverted: true,
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
const step_2 = createStep(
|
||||
"step_2",
|
||||
jest.fn((input, context) => {
|
||||
if (input) {
|
||||
return new StepResponse({ notAsyncResponse: input.hey })
|
||||
}
|
||||
}),
|
||||
jest.fn((_, context) => {
|
||||
return new StepResponse({
|
||||
step: context.metadata.action,
|
||||
idempotency_key: context.metadata.idempotency_key,
|
||||
reverted: true,
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
const step_3 = createStep(
|
||||
"step_3",
|
||||
jest.fn((res) => {
|
||||
return new StepResponse({
|
||||
done: {
|
||||
inputFromSyncStep: res.notAsyncResponse,
|
||||
},
|
||||
})
|
||||
})
|
||||
)
|
||||
|
||||
createWorkflow(
|
||||
{
|
||||
name: "workflow_sync",
|
||||
idempotent: true,
|
||||
},
|
||||
function (input) {
|
||||
step_1(input)
|
||||
|
||||
const ret2 = step_2({ hey: "oh" })
|
||||
|
||||
return new WorkflowResponse(step_3(ret2))
|
||||
}
|
||||
)
|
||||
@@ -512,6 +512,26 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
|
||||
|
||||
failTrap(done)
|
||||
})
|
||||
|
||||
it("should cancel and revert a completed workflow", async () => {
|
||||
const workflowId = "workflow_sync"
|
||||
|
||||
const { acknowledgement, transaction: trx } =
|
||||
await workflowOrcModule.run(workflowId, {
|
||||
input: {
|
||||
value: "123",
|
||||
},
|
||||
})
|
||||
|
||||
expect(trx.getFlow().state).toEqual("done")
|
||||
expect(acknowledgement.hasFinished).toBe(true)
|
||||
|
||||
const { transaction } = await workflowOrcModule.cancel(workflowId, {
|
||||
transactionId: acknowledgement.transactionId,
|
||||
})
|
||||
|
||||
expect(transaction.getFlow().state).toEqual("reverted")
|
||||
})
|
||||
})
|
||||
|
||||
// Note: These tests depend on actual Redis instance and waiting for the scheduled jobs to run, which isn't great.
|
||||
|
||||
@@ -35,8 +35,11 @@ export type WorkflowOrchestratorRunOptions<T> = Omit<
|
||||
|
||||
export type WorkflowOrchestratorCancelOptions = Omit<
|
||||
FlowCancelOptions,
|
||||
"transaction"
|
||||
>
|
||||
"transaction" | "transactionId" | "container"
|
||||
> & {
|
||||
transactionId: string
|
||||
container?: ContainerLike
|
||||
}
|
||||
|
||||
type RegisterStepSuccessOptions<T> = Omit<
|
||||
WorkflowOrchestratorRunOptions<T>,
|
||||
@@ -379,10 +382,8 @@ export class WorkflowOrchestratorService {
|
||||
async getRunningTransaction(
|
||||
workflowId: string,
|
||||
transactionId: string,
|
||||
options?: { context?: Context }
|
||||
context?: Context
|
||||
): Promise<DistributedTransactionType> {
|
||||
let { context } = options ?? {}
|
||||
|
||||
if (!workflowId) {
|
||||
throw new Error("Workflow ID is required")
|
||||
}
|
||||
@@ -398,10 +399,9 @@ export class WorkflowOrchestratorService {
|
||||
throw new Error(`Workflow with id "${workflowId}" not found.`)
|
||||
}
|
||||
|
||||
const transaction = await exportedWorkflow.getRunningTransaction(
|
||||
transactionId,
|
||||
context
|
||||
)
|
||||
const flow = exportedWorkflow()
|
||||
|
||||
const transaction = await flow.getRunningTransaction(transactionId, context)
|
||||
|
||||
return transaction
|
||||
}
|
||||
|
||||
@@ -17,7 +17,10 @@ import type {
|
||||
} from "@medusajs/framework/workflows-sdk"
|
||||
import { SqlEntityManager } from "@mikro-orm/postgresql"
|
||||
import { WorkflowExecution } from "@models"
|
||||
import { WorkflowOrchestratorService } from "@services"
|
||||
import {
|
||||
WorkflowOrchestratorCancelOptions,
|
||||
WorkflowOrchestratorService,
|
||||
} from "@services"
|
||||
|
||||
type InjectedDependencies = {
|
||||
manager: SqlEntityManager
|
||||
@@ -112,7 +115,7 @@ export class WorkflowsModuleService<
|
||||
return await this.workflowOrchestratorService_.getRunningTransaction(
|
||||
workflowId,
|
||||
transactionId,
|
||||
{ context }
|
||||
context
|
||||
)
|
||||
}
|
||||
|
||||
@@ -194,4 +197,13 @@ export class WorkflowsModuleService<
|
||||
updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time);
|
||||
`)
|
||||
}
|
||||
|
||||
@InjectSharedContext()
|
||||
async cancel(
|
||||
workflowId: string,
|
||||
options: WorkflowOrchestratorCancelOptions,
|
||||
@MedusaContext() context: Context = {}
|
||||
) {
|
||||
return this.workflowOrchestratorService_.cancel(workflowId, options)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user