diff --git a/.changeset/gentle-beers-pretend.md b/.changeset/gentle-beers-pretend.md new file mode 100644 index 0000000000..34d1691aab --- /dev/null +++ b/.changeset/gentle-beers-pretend.md @@ -0,0 +1,6 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +Chore(): workflow engine race condition improvements diff --git a/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts b/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts index 60340517c6..8eb6eadc7f 100644 --- a/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts +++ b/integration-tests/http/__tests__/promotions/admin/promotions.spec.ts @@ -8,7 +8,7 @@ import { import { setupTaxStructure } from "../../../../modules/__tests__/fixtures/tax" import { medusaTshirtProduct } from "../../../__fixtures__/product" -jest.setTimeout(50000000) +jest.setTimeout(50000) const adminHeaders = { headers: { "x-medusa-access-token": "test_token" }, @@ -716,7 +716,6 @@ medusaIntegrationTestRunner({ adminHeaders ) - // Simulate concurrent requests await Promise.all([ api .post( @@ -727,7 +726,6 @@ medusaIntegrationTestRunner({ storeHeaders ) .catch(() => {}), - /* api .post( `/store/carts/${cart.id}`, @@ -737,7 +735,6 @@ medusaIntegrationTestRunner({ storeHeaders ) .catch(() => {}), - */ ]) const cartAfterPromotion = ( diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index f1f9aa8ef4..fecd170ea5 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -144,7 +144,7 @@ moduleIntegrationTestRunner({ describe("Cancel transaction", function () { it("should cancel an ongoing execution with async unfinished yet step", (done) => { - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -205,7 +205,7 @@ moduleIntegrationTestRunner({ it("should cancel a complete execution with a sync workflow running as async", async () => { const workflowId = "workflow-to-cancel-id" + ulid() - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -257,7 +257,7 @@ moduleIntegrationTestRunner({ it("should cancel an ongoing execution with a sync workflow running as async", async () => { const workflowId = "workflow-to-cancel-id" + ulid() - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -309,7 +309,7 @@ moduleIntegrationTestRunner({ }) it("should cancel an ongoing execution with sync steps only", async () => { - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -356,7 +356,7 @@ moduleIntegrationTestRunner({ }) it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { - const transactionId = "transaction_id" + const transactionId = "transaction_id" + ulid() const workflowId = "workflow_id" + ulid() const step1 = createStep("step1", async () => { @@ -397,10 +397,11 @@ moduleIntegrationTestRunner({ it("should execute an async workflow keeping track of the event group id provided in the context", async () => { const eventGroupId = "event-group-id" + const transactionId = "transaction_id" + ulid() await workflowOrcModule.run(eventGroupWorkflowId, { input: {}, - transactionId: "transaction_id", + transactionId, context: { eventGroupId, }, @@ -412,7 +413,7 @@ moduleIntegrationTestRunner({ action: TransactionHandlerType.INVOKE, stepId: "step_1_event_group_id_background", workflowId: eventGroupWorkflowId, - transactionId: "transaction_id", + transactionId, }, stepResponse: { hey: "oh" }, }) @@ -427,9 +428,10 @@ moduleIntegrationTestRunner({ }) it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => { + const transactionId = "transaction_id_2" + ulid() await workflowOrcModule.run(eventGroupWorkflowId, { input: {}, - transactionId: "transaction_id_2", + transactionId, throwOnError: true, }) @@ -438,7 +440,7 @@ moduleIntegrationTestRunner({ action: TransactionHandlerType.INVOKE, stepId: "step_1_event_group_id_background", workflowId: eventGroupWorkflowId, - transactionId: "transaction_id_2", + transactionId, }, stepResponse: { hey: "oh" }, }) @@ -582,12 +584,13 @@ moduleIntegrationTestRunner({ }) it("should return a list of workflow executions and keep it saved when there is a retentionTime set", async () => { + const transactionId = "transaction_1" + ulid() await workflowOrcModule.run("workflow_2", { input: { value: "123", }, throwOnError: true, - transactionId: "transaction_1", + transactionId, }) let { data: executionsList } = await query.graph({ @@ -602,7 +605,7 @@ moduleIntegrationTestRunner({ action: TransactionHandlerType.INVOKE, stepId: "new_step_name", workflowId: "workflow_2", - transactionId: "transaction_1", + transactionId, }, stepResponse: { uhuuuu: "yeaah!" }, }) @@ -624,7 +627,7 @@ moduleIntegrationTestRunner({ }) it("should return a list of workflow executions and keep it saved when there is a retentionTime set but allow for executing the same workflow multiple times with different run_id if the workflow is considered done", async () => { - const transactionId = "transaction_1" + const transactionId = "transaction_1" + ulid() await workflowOrcModule.run( "workflow_not_idempotent_with_retention", { @@ -716,7 +719,7 @@ moduleIntegrationTestRunner({ }) it("should subscribe to a async workflow and receive the response when it finishes", (done) => { - const transactionId = "trx_123" + const transactionId = "trx_123" + ulid() const onFinish = jest.fn(() => { done() @@ -766,7 +769,7 @@ moduleIntegrationTestRunner({ it("should cancel and revert a non idempotent completed workflow with rentention time given a specific transaction id", async () => { const workflowId = "workflow_not_idempotent_with_retention" - const transactionId = "trx_123" + const transactionId = "trx_123" + ulid() await workflowOrcModule.run(workflowId, { input: { @@ -919,6 +922,7 @@ moduleIntegrationTestRunner({ }) it("should fetch an idempotent workflow after its completion", async () => { + const transactionId = "transaction_1" + ulid() const { transaction: firstRun } = (await workflowOrcModule.run( "workflow_idempotent", { @@ -926,7 +930,7 @@ moduleIntegrationTestRunner({ value: "123", }, throwOnError: true, - transactionId: "transaction_1", + transactionId, } )) as Awaited<{ transaction: DistributedTransactionType }> @@ -942,7 +946,7 @@ moduleIntegrationTestRunner({ value: "123", }, throwOnError: true, - transactionId: "transaction_1", + transactionId, } )) as Awaited<{ transaction: DistributedTransactionType }> diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts index d758bc325d..e929b0ea6d 100644 --- a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819104213.ts @@ -1,13 +1,20 @@ -import { Migration } from '@mikro-orm/migrations'; +import { Migration } from "@mikro-orm/migrations" export class Migration20250819104213 extends Migration { - override async up(): Promise { - this.addSql(`CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;`); + this.addSql( + `CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;` + ) + + this + .addSql(`CREATE EXTENSION IF NOT EXISTS pgcrypto; -- required for gen_random_uuid() +`) + this.addSql( + `ALTER TABLE "workflow_execution" ALTER COLUMN "id" SET DEFAULT 'wf_exec_' || encode(gen_random_bytes(6), 'hex');` + ) } override async down(): Promise { - this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`); + this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`) } - } diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819110924.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819110924.ts new file mode 100644 index 0000000000..8b52039fa5 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20250819110924.ts @@ -0,0 +1,21 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20250819110924 extends Migration { + override async up(): Promise { + this.addSql( + `CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;` + ) + + this.addSql( + `CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;` + ) + } + + override async down(): Promise { + this.addSql( + `drop index if exists "IDX_workflow_execution_workflow_id_transaction_id";` + ) + + this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`) + } +} diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 9b968766cd..09b48c83af 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -119,6 +119,68 @@ export class InMemoryDistributedTransactionStorage } private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { + const isNotStarted = data.flow.state === TransactionState.NOT_STARTED + const isFinished = [ + TransactionState.DONE, + TransactionState.FAILED, + TransactionState.REVERTED, + ].includes(data.flow.state) + + /** + * Bit of explanation: + * + * When a workflow run, it run all sync step in memory until it reaches a async step. + * In that case, it might handover to another process to continue the execution. Thats why + * we need to save the current state of the flow. Then from there, it will run again all + * sync steps until the next async step. an so on so forth. + * + * To summarize, we only trully need to save the data when we are reaching any steps that + * trigger a handover to a potential other process. + * + * This allows us to spare some resources and time by not over communicating with the external + * database when it is not really needed + */ + + const isFlowInvoking = data.flow.state === TransactionState.INVOKING + + const stepsArray = Object.values(data.flow.steps) as TransactionStep[] + let currentStep!: TransactionStep + + const targetStates = isFlowInvoking + ? [ + TransactionStepState.INVOKING, + TransactionStepState.DONE, + TransactionStepState.FAILED, + ] + : [TransactionStepState.COMPENSATING] + + // Find the current step from the end + for (let i = stepsArray.length - 1; i >= 0; i--) { + const step = stepsArray[i] + + if (step.id === "_root") { + break + } + + const isTargetState = targetStates.includes(step.invoke?.state) + + if (isTargetState) { + currentStep = step + break + } + } + + const currentStepsIsAsync = currentStep + ? stepsArray.some( + (step) => + step?.definition?.async === true && step.depth === currentStep.depth + ) + : false + + if (!(isNotStarted || isFinished) && !currentStepsIsAsync) { + return + } + await this.workflowExecutionService_.upsert([ { workflow_id: data.flow.modelId, @@ -285,6 +347,12 @@ export class InMemoryDistributedTransactionStorage key: string options?: TransactionOptions }) { + // TODO: comment, we have been able to try to replace this entire function + // with a locking first approach. We might come back to that another time. + // This remove the necessity of all the below logic to prevent race conditions + // by preventing the exact same execution to run at the same time. + // See early commits from: https://github.com/medusajs/medusa/pull/13345/commits + const isInitialCheckpoint = [TransactionState.NOT_STARTED].includes( data.flow.state ) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index e0bc17bd1e..6ef64f77f7 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -39,6 +39,7 @@ import { } from "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { TestDatabase } from "../utils" +import { Redis } from "ioredis" jest.setTimeout(300000) @@ -96,6 +97,20 @@ moduleIntegrationTestRunner({ jest.clearAllMocks() }) + afterAll(async () => { + // empty redis + const connection = new Redis("localhost:6379", { + lazyConnect: true, + }) + + await new Promise(async (resolve) => { + await connection.connect(resolve) + }) + + await connection.flushall() + await connection.disconnect() + }) + let query: RemoteQueryFunction let sharedContainer_: MedusaContainer @@ -152,7 +167,7 @@ moduleIntegrationTestRunner({ describe("Testing basic workflow", function () { describe("Cancel transaction", function () { it("should cancel an ongoing execution with async unfinished yet step", (done) => { - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -219,7 +234,7 @@ moduleIntegrationTestRunner({ it("should cancel a complete execution with a sync workflow running as async", async () => { const workflowId = "workflow-to-cancel-id" + ulid() - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -274,7 +289,7 @@ moduleIntegrationTestRunner({ it("should cancel an ongoing execution with a sync workflow running as async", async () => { const workflowId = "workflow-to-cancel-id" + ulid() - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -329,7 +344,7 @@ moduleIntegrationTestRunner({ }) it("should cancel an ongoing execution with sync steps only", async () => { - const transactionId = "transaction-to-cancel-id" + const transactionId = "transaction-to-cancel-id" + ulid() const step1 = createStep("step1", async () => { return new StepResponse("step1") }) @@ -379,7 +394,7 @@ moduleIntegrationTestRunner({ }) it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { - const transactionId = "concurrency_transaction_id" + const transactionId = "concurrency_transaction_id" + ulid() const workflowId = "concurrency_workflow_id" + ulid() const step1 = createStep("step1", async () => { @@ -433,6 +448,7 @@ moduleIntegrationTestRunner({ expect(executionsList).toHaveLength(1) + console.log(">>>>>>>>> setting step success") const { result } = await workflowOrcModule.setStepSuccess({ idempotencyKey: { action: TransactionHandlerType.INVOKE, @@ -443,6 +459,7 @@ moduleIntegrationTestRunner({ stepResponse: { uhuuuu: "yeaah!" }, }) + console.log(">>>>>>>>> setting step success done") ;({ data: executionsList } = await query.graph({ entity: "workflow_executions", fields: ["id"], @@ -457,12 +474,13 @@ moduleIntegrationTestRunner({ }) it("should return a list of workflow executions and keep it saved when there is a retentionTime set", async () => { + const transactionId = "transaction_1" + ulid() await workflowOrcModule.run("workflow_2", { input: { value: "123", }, throwOnError: true, - transactionId: "transaction_1", + transactionId, }) let { data: executionsList } = await query.graph({ @@ -477,7 +495,7 @@ moduleIntegrationTestRunner({ action: TransactionHandlerType.INVOKE, stepId: "new_step_name", workflowId: "workflow_2", - transactionId: "transaction_1", + transactionId, }, stepResponse: { uhuuuu: "yeaah!" }, }) @@ -490,11 +508,12 @@ moduleIntegrationTestRunner({ }) it("should return a list of failed workflow executions and keep it saved when there is a retentionTime set", async () => { + const transactionId = "transaction_1" + ulid() await workflowOrcModule.run("workflow_2", { input: { value: "123", }, - transactionId: "transaction_1", + transactionId, }) let { data: executionsList } = await query.graph({ @@ -509,7 +528,7 @@ moduleIntegrationTestRunner({ action: TransactionHandlerType.INVOKE, stepId: "new_step_name", workflowId: "workflow_2", - transactionId: "transaction_1", + transactionId, }, stepResponse: { uhuuuu: "yeaah!" }, options: { @@ -702,11 +721,12 @@ moduleIntegrationTestRunner({ }) it("should revert the entire transaction when a step timeout expires in a async step", async () => { + const transactionId = "transaction_1" + ulid() await workflowOrcModule.run("workflow_step_timeout_async", { input: { myInput: "123", }, - transactionId: "transaction_1", + transactionId, throwOnError: false, }) @@ -718,7 +738,7 @@ moduleIntegrationTestRunner({ input: { myInput: "123", }, - transactionId: "transaction_1", + transactionId, throwOnError: false, } )) as Awaited<{ @@ -771,7 +791,7 @@ moduleIntegrationTestRunner({ }) it("should complete an async workflow that returns a StepResponse", (done) => { - const transactionId = "transaction_1" + const transactionId = "transaction_1" + ulid() workflowOrcModule .run("workflow_async_background", { input: { @@ -801,7 +821,7 @@ moduleIntegrationTestRunner({ }) it("should subscribe to a async workflow and receive the response when it finishes", (done) => { - const transactionId = "trx_123" + const transactionId = "trx_123" + ulid() const onFinish = jest.fn() @@ -830,18 +850,19 @@ moduleIntegrationTestRunner({ }) it("should not skip step if condition is true", function (done) { + const transactionId = "trx_123_when" + ulid() void workflowOrcModule.run("wf-when", { input: { callSubFlow: true, }, - transactionId: "trx_123_when", + transactionId, throwOnError: true, logOnError: true, }) void workflowOrcModule.subscribe({ workflowId: "wf-when", - transactionId: "trx_123_when", + transactionId, subscriber: (event) => { if (event.eventType === "onFinish") { done() @@ -854,12 +875,12 @@ moduleIntegrationTestRunner({ it("should cancel an async sub workflow when compensating", (done) => { const workflowId = "workflow_async_background_fail" - + const transactionId = "trx_123_compensate_async_sub_workflow" + ulid() void workflowOrcModule.run(workflowId, { input: { callSubFlow: true, }, - transactionId: "trx_123_compensate_async_sub_workflow", + transactionId, throwOnError: false, logOnError: false, }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index 2860ca1b1f..dc8462e2da 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -38,7 +38,7 @@ moduleIntegrationTestRunner({ testSuite: ({ service: workflowOrcModule, medusaApp }) => { describe("Testing race condition of the workflow during retry", () => { it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { - const transactionId = "transaction_id" + const transactionId = "transaction_id" + ulid() const workflowId = "workflow-1" + ulid() const subWorkflowId = "sub-" + workflowId @@ -122,8 +122,8 @@ moduleIntegrationTestRunner({ }) it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { - const transactionId = "transaction_id" - const workflowId = "RACE_workflow-1" + const transactionId = "transaction_id" + ulid() + const workflowId = "RACE_workflow-1" + ulid() const step0InvokeMock = jest.fn() const step0CompensateMock = jest.fn() diff --git a/packages/modules/workflow-engine-redis/src/migrations/Migration20250819110924.ts b/packages/modules/workflow-engine-redis/src/migrations/Migration20250819110924.ts new file mode 100644 index 0000000000..8b52039fa5 --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/Migration20250819110924.ts @@ -0,0 +1,21 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20250819110924 extends Migration { + override async up(): Promise { + this.addSql( + `CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id_transaction_id" ON "workflow_execution" (workflow_id, transaction_id) WHERE deleted_at IS NULL;` + ) + + this.addSql( + `CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_run_id" ON "workflow_execution" (run_id) WHERE deleted_at IS NULL;` + ) + } + + override async down(): Promise { + this.addSql( + `drop index if exists "IDX_workflow_execution_workflow_id_transaction_id";` + ) + + this.addSql(`drop index if exists "IDX_workflow_execution_run_id";`) + } +} diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 9aa2f5c1af..f25a4bc1cf 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -201,6 +201,68 @@ export class RedisDistributedTransactionStorage } private async saveToDb(data: TransactionCheckpoint, retentionTime?: number) { + const isNotStarted = data.flow.state === TransactionState.NOT_STARTED + const isFinished = [ + TransactionState.DONE, + TransactionState.FAILED, + TransactionState.REVERTED, + ].includes(data.flow.state) + + /** + * Bit of explanation: + * + * When a workflow run, it run all sync step in memory until it reaches a async step. + * In that case, it might handover to another process to continue the execution. Thats why + * we need to save the current state of the flow. Then from there, it will run again all + * sync steps until the next async step. an so on so forth. + * + * To summarize, we only trully need to save the data when we are reaching any steps that + * trigger a handover to a potential other process. + * + * This allows us to spare some resources and time by not over communicating with the external + * database when it is not really needed + */ + + const isFlowInvoking = data.flow.state === TransactionState.INVOKING + + const stepsArray = Object.values(data.flow.steps) as TransactionStep[] + let currentStep!: TransactionStep + + const targetStates = isFlowInvoking + ? [ + TransactionStepState.INVOKING, + TransactionStepState.DONE, + TransactionStepState.FAILED, + ] + : [TransactionStepState.COMPENSATING] + + // Find the current step from the end + for (let i = stepsArray.length - 1; i >= 0; i--) { + const step = stepsArray[i] + + if (step.id === "_root") { + break + } + + const isTargetState = targetStates.includes(step.invoke?.state) + + if (isTargetState) { + currentStep = step + break + } + } + + const currentStepsIsAsync = currentStep + ? stepsArray.some( + (step) => + step?.definition?.async === true && step.depth === currentStep.depth + ) + : false + + if (!(isNotStarted || isFinished) && !currentStepsIsAsync) { + return + } + await this.workflowExecutionService_.upsert([ { workflow_id: data.flow.modelId, @@ -363,10 +425,10 @@ export class RedisDistributedTransactionStorage }) } - const isNotStarted = data.flow.state === TransactionState.NOT_STARTED - const isManualTransactionId = !data.flow.transactionId.startsWith("auto-") // Only set if not exists - const shouldSetNX = isNotStarted && isManualTransactionId + const shouldSetNX = + data.flow.state === TransactionState.NOT_STARTED && + !data.flow.transactionId.startsWith("auto-") // Prepare operations to be executed in batch or pipeline const data_ = {