feat(workflows-*): Allow to re run non idempotent but stored workflow with the same transaction id if considered done (#12362)

This commit is contained in:
Adrien de Peretti
2025-05-06 17:17:49 +02:00
committed by GitHub
parent 97dd520c64
commit 80007f3afd
31 changed files with 809 additions and 95 deletions

View File

@@ -6,3 +6,4 @@ export * from "./workflow_idempotent"
export * from "./workflow_step_timeout"
export * from "./workflow_sync"
export * from "./workflow_transaction_timeout"
export * from "./workflow_not_idempotent_with_retention"

View File

@@ -0,0 +1,71 @@
import {
createStep,
createWorkflow,
StepResponse,
} from "@medusajs/framework/workflows-sdk"
const step_1 = createStep(
"step_1",
jest.fn((input) => {
input.test = "test"
return new StepResponse(input, { compensate: 123 })
}),
jest.fn((compensateInput) => {
if (!compensateInput) {
return
}
return new StepResponse({
reverted: true,
})
})
)
export const workflowNotIdempotentWithRetentionStep2Invoke = jest.fn(
(input, context) => {
if (input) {
return new StepResponse({ notAsyncResponse: input.hey })
}
}
)
const step_2 = createStep(
"step_2",
workflowNotIdempotentWithRetentionStep2Invoke,
jest.fn((_, context) => {
return new StepResponse({
step: context.metadata.action,
idempotency_key: context.metadata.idempotency_key,
reverted: true,
})
})
)
export const workflowNotIdempotentWithRetentionStep3Invoke = jest.fn((res) => {
return new StepResponse({
done: {
inputFromSyncStep: res.notAsyncResponse,
},
})
})
const step_3 = createStep(
"step_3",
workflowNotIdempotentWithRetentionStep3Invoke
)
createWorkflow(
{
name: "workflow_not_idempotent_with_retention",
retentionTime: 60,
},
function (input) {
step_1(input)
step_2({ hey: "oh" })
const ret2 = step_2({ hey: "hello" }).config({
name: "new_step_name",
})
return step_3(ret2)
}
)

View File

@@ -1,5 +1,6 @@
import {
DistributedTransactionType,
TransactionState,
WorkflowManager,
} from "@medusajs/framework/orchestration"
import {
@@ -10,6 +11,7 @@ import {
import {
Module,
Modules,
promiseAll,
TransactionHandlerType,
} from "@medusajs/framework/utils"
import {
@@ -21,6 +23,7 @@ import {
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
import { WorkflowsModuleService } from "@services"
import { asFunction } from "awilix"
import { ulid } from "ulid"
import { setTimeout as setTimeoutSync } from "timers"
import { setTimeout as setTimeoutPromise } from "timers/promises"
import "../__fixtures__"
@@ -29,6 +32,8 @@ import {
conditionalStep3Invoke,
workflow2Step2Invoke,
workflow2Step3Invoke,
workflowNotIdempotentWithRetentionStep2Invoke,
workflowNotIdempotentWithRetentionStep3Invoke,
} from "../__fixtures__"
import {
eventGroupWorkflowId,
@@ -43,7 +48,7 @@ const failTrap = (done) => {
setTimeoutSync(() => {
// REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending
console.warn(
"Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually."
`Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually.`
)
done()
}, 5000)
@@ -94,10 +99,55 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
serviceName: "workflows",
field: "workflowExecution",
},
run_id: {
linkable: "workflow_execution_run_id",
entity: "WorkflowExecution",
primaryKey: "run_id",
serviceName: "workflows",
field: "workflowExecution",
},
},
})
})
it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => {
const transactionId = "transaction_id"
const workflowId = "workflow_id" + ulid()
const step1 = createStep("step1", async () => {
await setTimeoutPromise(100)
return new StepResponse("step1")
})
createWorkflow(
{
name: workflowId,
retentionTime: 1000,
},
function () {
return new WorkflowResponse(step1())
}
)
const [result1, result2] = await promiseAll([
workflowOrcModule.run(workflowId, {
input: {},
transactionId,
}),
workflowOrcModule
.run(workflowId, {
input: {},
transactionId,
})
.catch((e) => e),
])
expect(result1.result).toEqual("step1")
expect(result2.message).toEqual(
"Transaction already started for transactionId: " + transactionId
)
})
it("should execute an async workflow keeping track of the event group id provided in the context", async () => {
const eventGroupId = "event-group-id"
@@ -232,6 +282,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
{ obj: "return from 2" },
{ obj: "return from 3" },
])
done()
}
},
})
@@ -309,13 +360,13 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
stepResponse: { uhuuuu: "yeaah!" },
})
expect(workflow2Step2Invoke).toBeCalledTimes(2)
expect(workflow2Step2Invoke).toHaveBeenCalledTimes(2)
expect(workflow2Step2Invoke.mock.calls[0][0]).toEqual({ hey: "oh" })
expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({
hey: "async hello",
})
expect(workflow2Step3Invoke).toBeCalledTimes(1)
expect(workflow2Step3Invoke).toHaveBeenCalledTimes(1)
expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({
uhuuuu: "yeaah!",
})
@@ -327,6 +378,72 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(executionsList).toHaveLength(1)
})
it("should return a list of workflow executions and keep it saved when there is a retentionTime set but allow for executing the same workflow multiple times with different run_id if the workflow is considered done", async () => {
const transactionId = "transaction_1"
await workflowOrcModule.run(
"workflow_not_idempotent_with_retention",
{
input: {
value: "123",
},
transactionId,
}
)
let { data: executionsList } = await query.graph({
entity: "workflow_executions",
fields: ["id", "run_id", "transaction_id"],
})
expect(executionsList).toHaveLength(1)
expect(
workflowNotIdempotentWithRetentionStep2Invoke
).toHaveBeenCalledTimes(2)
expect(
workflowNotIdempotentWithRetentionStep2Invoke.mock.calls[0][0]
).toEqual({ hey: "oh" })
expect(
workflowNotIdempotentWithRetentionStep2Invoke.mock.calls[1][0]
).toEqual({
hey: "hello",
})
expect(
workflowNotIdempotentWithRetentionStep3Invoke
).toHaveBeenCalledTimes(1)
expect(
workflowNotIdempotentWithRetentionStep3Invoke.mock.calls[0][0]
).toEqual({
notAsyncResponse: "hello",
})
await workflowOrcModule.run(
"workflow_not_idempotent_with_retention",
{
input: {
value: "123",
},
transactionId,
}
)
const { data: executionsList2 } = await query.graph({
entity: "workflow_executions",
filters: {
id: { $nin: executionsList.map((e) => e.id) },
},
fields: ["id", "run_id", "transaction_id"],
})
expect(executionsList2).toHaveLength(1)
expect(executionsList2[0].run_id).not.toEqual(
executionsList[0].run_id
)
expect(executionsList2[0].transaction_id).toEqual(
executionsList[0].transaction_id
)
})
it("should revert the entire transaction when a step timeout expires", async () => {
const { transaction } = (await workflowOrcModule.run(
"workflow_step_timeout",
@@ -402,6 +519,37 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(transaction.getFlow().state).toEqual("reverted")
})
it("should cancel and revert a non idempotent completed workflow with rentention time given a specific transaction id", async () => {
const workflowId = "workflow_not_idempotent_with_retention"
const transactionId = "trx_123"
await workflowOrcModule.run(workflowId, {
input: {
value: "123",
},
transactionId,
})
let executions = await workflowOrcModule.listWorkflowExecutions({
transaction_id: transactionId,
})
expect(executions.length).toEqual(1)
expect(executions[0].state).toEqual(TransactionState.DONE)
expect(executions[0].transaction_id).toEqual(transactionId)
await workflowOrcModule.cancel(workflowId, {
transactionId,
})
executions = await workflowOrcModule.listWorkflowExecutions({
transaction_id: transactionId,
})
expect(executions.length).toEqual(1)
expect(executions[0].state).toEqual(TransactionState.REVERTED)
})
it("should run conditional steps if condition is true", (done) => {
void workflowOrcModule.subscribe({
workflowId: "workflow_conditional_step",
@@ -449,8 +597,8 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
describe("Scheduled workflows", () => {
beforeEach(() => {
jest.useFakeTimers()
jest.clearAllMocks()
jest.useFakeTimers()
// Register test-value in the container for all tests
const sharedContainer =

View File

@@ -29,7 +29,7 @@
"resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json",
"build": "rimraf dist && tsc --build && npm run resolve:aliases",
"test": "jest --passWithNoTests --runInBand --bail --forceExit -- src",
"test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts",
"test:integration": "jest --forceExit -- integration-tests/**/__tests__/*.ts",
"migration:initial": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create --initial",
"migration:create": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create",
"migration:up": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:up",

View File

@@ -24,6 +24,15 @@
"nullable": false,
"mappedType": "text"
},
"run_id": {
"name": "run_id",
"type": "text",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"mappedType": "text"
},
"id": {
"name": "id",
"type": "text",
@@ -151,13 +160,13 @@
"expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique",
"keyName": "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique",
"columnNames": [],
"composite": false,
"constraint": false,
"primary": false,
"unique": false,
"expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL"
"expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_run_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id, run_id) WHERE deleted_at IS NULL"
},
{
"keyName": "IDX_workflow_execution_state",
@@ -172,7 +181,8 @@
"keyName": "workflow_execution_pkey",
"columnNames": [
"workflow_id",
"transaction_id"
"transaction_id",
"run_id"
],
"composite": true,
"constraint": true,

View File

@@ -0,0 +1,45 @@
import { Migration } from "@mikro-orm/migrations"
import { ulid } from "ulid"
export class Migration20250505092459 extends Migration {
override async up(): Promise<void> {
this.addSql(
`alter table if exists "workflow_execution" drop constraint if exists "workflow_execution_workflow_id_transaction_id_run_id_unique";`
)
this.addSql(
`drop index if exists "IDX_workflow_execution_workflow_id_transaction_id_unique";`
)
this.addSql(
`alter table if exists "workflow_execution" drop constraint if exists "PK_workflow_execution_workflow_id_transaction_id";`
)
this.addSql(
`alter table if exists "workflow_execution" add column if not exists "run_id" text not null default '${ulid()}';`
)
this.addSql(
`CREATE UNIQUE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique" ON "workflow_execution" (workflow_id, transaction_id, run_id) WHERE deleted_at IS NULL;`
)
this.addSql(
`alter table if exists "workflow_execution" add constraint "workflow_execution_pkey" primary key ("workflow_id", "transaction_id", "run_id");`
)
}
override async down(): Promise<void> {
this.addSql(
`drop index if exists "IDX_workflow_execution_workflow_id_transaction_id_run_id_unique";`
)
this.addSql(
`alter table if exists "workflow_execution" drop constraint if exists "workflow_execution_pkey";`
)
this.addSql(
`alter table if exists "workflow_execution" drop column if exists "run_id";`
)
this.addSql(
`CREATE UNIQUE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id_unique" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;`
)
this.addSql(
`alter table if exists "workflow_execution" add constraint "workflow_execution_pkey" primary key ("workflow_id", "transaction_id");`
)
}
}

View File

@@ -6,6 +6,7 @@ export const WorkflowExecution = model
id: model.id({ prefix: "wf_exec" }),
workflow_id: model.text().primaryKey(),
transaction_id: model.text().primaryKey(),
run_id: model.text().primaryKey(),
execution: model.json().nullable(),
context: model.json().nullable(),
state: model.enum(TransactionState),
@@ -25,7 +26,7 @@ export const WorkflowExecution = model
where: "deleted_at IS NULL",
},
{
on: ["workflow_id", "transaction_id"],
on: ["workflow_id", "transaction_id", "run_id"],
unique: true,
where: "deleted_at IS NULL",
},

View File

@@ -16,6 +16,7 @@ type WorkflowExecution {
deleted_at: DateTime
workflow_id: string
transaction_id: string
run_id: string
execution: JSON
context: JSON
state: TransactionState

View File

@@ -31,6 +31,7 @@ export type WorkflowOrchestratorRunOptions<T> = Omit<
"container"
> & {
transactionId?: string
runId?: string
container?: ContainerLike
}
@@ -140,7 +141,7 @@ export class WorkflowOrchestratorService {
let { throwOnError, context } = options ?? {}
throwOnError ??= true
context ??= {}
context.transactionId = transactionId ?? ulid()
context.transactionId = transactionId ?? "auto-" + ulid()
const workflowId = isString(workflowIdOrWorkflow)
? workflowIdOrWorkflow
@@ -259,7 +260,10 @@ export class WorkflowOrchestratorService {
const transaction = await this.getRunningTransaction(
workflowId,
transactionId,
options
{
...options,
isCancelling: true,
}
)
if (!transaction) {

View File

@@ -5,11 +5,17 @@ import {
SchedulerOptions,
SkipExecutionError,
TransactionCheckpoint,
TransactionContext,
TransactionFlow,
TransactionOptions,
TransactionStep,
TransactionStepError,
} from "@medusajs/framework/orchestration"
import { Logger, ModulesSdkTypes } from "@medusajs/framework/types"
import {
InferEntityType,
Logger,
ModulesSdkTypes,
} from "@medusajs/framework/types"
import {
MedusaError,
TransactionState,
@@ -19,6 +25,7 @@ import {
} from "@medusajs/framework/utils"
import { WorkflowOrchestratorService } from "@services"
import { type CronExpression, parseExpression } from "cron-parser"
import { WorkflowExecution } from "../models/workflow-execution"
function parseNextExecution(
optionsOrExpression: SchedulerOptions | CronExpression | string | number
@@ -86,6 +93,7 @@ export class InMemoryDistributedTransactionStorage
{
workflow_id: data.flow.modelId,
transaction_id: data.flow.transactionId,
run_id: data.flow.runId,
execution: data.flow,
context: {
data: data.context,
@@ -102,13 +110,16 @@ export class InMemoryDistributedTransactionStorage
{
workflow_id: data.flow.modelId,
transaction_id: data.flow.transactionId,
run_id: data.flow.runId,
},
])
}
async get(
key: string,
options?: TransactionOptions
options?: TransactionOptions & {
isCancelling?: boolean
}
): Promise<TransactionCheckpoint | undefined> {
const data = this.storage.get(key)
@@ -122,23 +133,53 @@ export class InMemoryDistributedTransactionStorage
}
const [_, workflowId, transactionId] = key.split(":")
const trx = await this.workflowExecutionService_
.retrieve(
{
workflow_id: workflowId,
transaction_id: transactionId,
},
{
select: ["execution", "context"],
}
)
.catch(() => undefined)
const trx: InferEntityType<typeof WorkflowExecution> | undefined =
await this.workflowExecutionService_
.list(
{
workflow_id: workflowId,
transaction_id: transactionId,
},
{
select: ["execution", "context"],
order: {
id: "desc",
},
take: 1,
}
)
.then((trx) => trx[0])
.catch(() => undefined)
if (trx) {
const execution = trx.execution as TransactionFlow
if (!idempotent) {
const isFailedOrReverted = [
TransactionState.REVERTED,
TransactionState.FAILED,
].includes(execution.state)
const isDone = execution.state === TransactionState.DONE
const isCancellingAndFailedOrReverted =
options?.isCancelling && isFailedOrReverted
const isNotCancellingAndDoneOrFailedOrReverted =
!options?.isCancelling && (isDone || isFailedOrReverted)
if (
isCancellingAndFailedOrReverted ||
isNotCancellingAndDoneOrFailedOrReverted
) {
return
}
}
return {
flow: trx.execution,
context: trx.context.data,
errors: trx.context.errors,
flow: trx.execution as TransactionFlow,
context: trx.context?.data as TransactionContext,
errors: trx.context?.errors as TransactionStepError[],
}
}
@@ -181,6 +222,20 @@ export class InMemoryDistributedTransactionStorage
}
// Store in memory
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const isManualTransactionId = !data.flow.transactionId.startsWith("auto-")
if (isNotStarted && isManualTransactionId) {
const storedData = this.storage.get(key)
if (storedData) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
"Transaction already started for transactionId: " +
data.flow.transactionId
)
}
}
this.storage.set(key, data)
// Optimize DB operations - only perform when necessary
@@ -206,15 +261,23 @@ export class InMemoryDistributedTransactionStorage
key: string
options?: TransactionOptions
}) {
const isInitialCheckpoint = data.flow.state === TransactionState.NOT_STARTED
const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes(
data.flow.state
)
/**
* In case many execution can succeed simultaneously, we need to ensure that the latest
* execution does continue if a previous execution is considered finished
*/
const currentFlow = data.flow
const getOptions = {
...options,
isCancelling: !!data.flow.cancelledAt,
} as Parameters<typeof this.get>[1]
const { flow: latestUpdatedFlow } =
(await this.get(key, options)) ??
(await this.get(key, getOptions)) ??
({ flow: {} } as { flow: TransactionFlow })
if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) {