chore(): improve workflow engine storage (#13345)

* chore(workflow-engines): Improve race condition management

* cleanup

* cleanup

* chore(workflow-engines): Improve race condition management

* chore(workflow-engines): Improve race condition management

* chore(workflow-engines): heartbeat extend TTL

* Refactor chore title for workflow engine improvements

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* update tests

* revert idempotent

* add run_id index + await deletion

* improve saving

* comment

* remove only

---------

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-09-02 11:18:12 +02:00
committed by GitHub
parent b85a46e85b
commit bd206cb250
10 changed files with 255 additions and 48 deletions

View File

@@ -144,7 +144,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
describe("Cancel transaction", function () {
it("should cancel an ongoing execution with async unfinished yet step", (done) => {
const transactionId = "transaction-to-cancel-id"
const transactionId = "transaction-to-cancel-id" + ulid()
const step1 = createStep("step1", async () => {
return new StepResponse("step1")
})
@@ -205,7 +205,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
it("should cancel a complete execution with a sync workflow running as async", async () => {
const workflowId = "workflow-to-cancel-id" + ulid()
const transactionId = "transaction-to-cancel-id"
const transactionId = "transaction-to-cancel-id" + ulid()
const step1 = createStep("step1", async () => {
return new StepResponse("step1")
})
@@ -257,7 +257,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
it("should cancel an ongoing execution with a sync workflow running as async", async () => {
const workflowId = "workflow-to-cancel-id" + ulid()
const transactionId = "transaction-to-cancel-id"
const transactionId = "transaction-to-cancel-id" + ulid()
const step1 = createStep("step1", async () => {
return new StepResponse("step1")
})
@@ -309,7 +309,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should cancel an ongoing execution with sync steps only", async () => {
const transactionId = "transaction-to-cancel-id"
const transactionId = "transaction-to-cancel-id" + ulid()
const step1 = createStep("step1", async () => {
return new StepResponse("step1")
})
@@ -356,7 +356,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => {
const transactionId = "transaction_id"
const transactionId = "transaction_id" + ulid()
const workflowId = "workflow_id" + ulid()
const step1 = createStep("step1", async () => {
@@ -397,10 +397,11 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
it("should execute an async workflow keeping track of the event group id provided in the context", async () => {
const eventGroupId = "event-group-id"
const transactionId = "transaction_id" + ulid()
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
transactionId: "transaction_id",
transactionId,
context: {
eventGroupId,
},
@@ -412,7 +413,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id",
transactionId,
},
stepResponse: { hey: "oh" },
})
@@ -427,9 +428,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => {
const transactionId = "transaction_id_2" + ulid()
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
transactionId: "transaction_id_2",
transactionId,
throwOnError: true,
})
@@ -438,7 +440,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id_2",
transactionId,
},
stepResponse: { hey: "oh" },
})
@@ -582,12 +584,13 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should return a list of workflow executions and keep it saved when there is a retentionTime set", async () => {
const transactionId = "transaction_1" + ulid()
await workflowOrcModule.run("workflow_2", {
input: {
value: "123",
},
throwOnError: true,
transactionId: "transaction_1",
transactionId,
})
let { data: executionsList } = await query.graph({
@@ -602,7 +605,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
action: TransactionHandlerType.INVOKE,
stepId: "new_step_name",
workflowId: "workflow_2",
transactionId: "transaction_1",
transactionId,
},
stepResponse: { uhuuuu: "yeaah!" },
})
@@ -624,7 +627,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should return a list of workflow executions and keep it saved when there is a retentionTime set but allow for executing the same workflow multiple times with different run_id if the workflow is considered done", async () => {
const transactionId = "transaction_1"
const transactionId = "transaction_1" + ulid()
await workflowOrcModule.run(
"workflow_not_idempotent_with_retention",
{
@@ -716,7 +719,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should subscribe to a async workflow and receive the response when it finishes", (done) => {
const transactionId = "trx_123"
const transactionId = "trx_123" + ulid()
const onFinish = jest.fn(() => {
done()
@@ -766,7 +769,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
it("should cancel and revert a non idempotent completed workflow with rentention time given a specific transaction id", async () => {
const workflowId = "workflow_not_idempotent_with_retention"
const transactionId = "trx_123"
const transactionId = "trx_123" + ulid()
await workflowOrcModule.run(workflowId, {
input: {
@@ -919,6 +922,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
it("should fetch an idempotent workflow after its completion", async () => {
const transactionId = "transaction_1" + ulid()
const { transaction: firstRun } = (await workflowOrcModule.run(
"workflow_idempotent",
{
@@ -926,7 +930,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
value: "123",
},
throwOnError: true,
transactionId: "transaction_1",
transactionId,
}
)) as Awaited<{ transaction: DistributedTransactionType }>
@@ -942,7 +946,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
value: "123",
},
throwOnError: true,
transactionId: "transaction_1",
transactionId,
}
)) as Awaited<{ transaction: DistributedTransactionType }>

View File

@@ -1,13 +1,20 @@
import { Migration } from '@mikro-orm/migrations';
import { Migration } from "@mikro-orm/migrations"
export class Migration20250819104213 extends Migration {
override async up(): Promise<void> {
this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`);
this.addSql(
`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`
)
this
.addSql(`CREATE EXTENSION IF NOT EXISTS pgcrypto; -- required for gen_random_uuid()
`)
this.addSql(
`ALTER TABLE "workflow_execution" ALTER COLUMN "id" SET DEFAULT 'wf_exec_' || encode(gen_random_bytes(6), 'hex');`
)
}
override async down(): Promise<void> {
this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`);
this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`)
}
}

View File

@@ -0,0 +1,21 @@
import { Migration } from "@mikro-orm/migrations"
export class Migration20250819110924 extends Migration {
override async up(): Promise<void> {
this.addSql(
`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;`
)
this.addSql(
`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`
)
}
override async down(): Promise<void> {
this.addSql(
`drop index if exists "IDX_workflow_execution_workflow_id_transaction_id";`
)
this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`)
}
}

View File

@@ -119,6 +119,68 @@ export class InMemoryDistributedTransactionStorage
}
private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) {
const isNotStarted = data.flow.state === TransactionState.NOT_STARTED
const isFinished = [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.REVERTED,
].includes(data.flow.state)
/**
* Bit of explanation:
*
* When a workflow run, it run all sync step in memory until it reaches a async step.
* In that case, it might handover to another process to continue the execution. Thats why
* we need to save the current state of the flow. Then from there, it will run again all
* sync steps until the next async step. an so on so forth.
*
* To summarize, we only trully need to save the data when we are reaching any steps that
* trigger a handover to a potential other process.
*
* This allows us to spare some resources and time by not over communicating with the external
* database when it is not really needed
*/
const isFlowInvoking = data.flow.state === TransactionState.INVOKING
const stepsArray = Object.values(data.flow.steps) as TransactionStep[]
let currentStep!: TransactionStep
const targetStates = isFlowInvoking
? [
TransactionStepState.INVOKING,
TransactionStepState.DONE,
TransactionStepState.FAILED,
]
: [TransactionStepState.COMPENSATING]
// Find the current step from the end
for (let i = stepsArray.length - 1; i >= 0; i--) {
const step = stepsArray[i]
if (step.id === "_root") {
break
}
const isTargetState = targetStates.includes(step.invoke?.state)
if (isTargetState) {
currentStep = step
break
}
}
const currentStepsIsAsync = currentStep
? stepsArray.some(
(step) =>
step?.definition?.async === true && step.depth === currentStep.depth
)
: false
if (!(isNotStarted || isFinished) && !currentStepsIsAsync) {
return
}
await this.workflowExecutionService_.upsert([
{
workflow_id: data.flow.modelId,
@@ -285,6 +347,12 @@ export class InMemoryDistributedTransactionStorage
key: string
options?: TransactionOptions
}) {
// TODO: comment, we have been able to try to replace this entire function
// with a locking first approach. We might come back to that another time.
// This remove the necessity of all the below logic to prevent race conditions
// by preventing the exact same execution to run at the same time.
// See early commits from: https://github.com/medusajs/medusa/pull/13345/commits
const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes(
data.flow.state
)