From 31abba8cdeedee65328e5529e172d4d3b23661f3 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Thu, 10 Apr 2025 12:36:36 -0300 Subject: [PATCH] fix(orchestrator): save checkpoint before async step (#12138) --- .changeset/three-beans-fold.md | 5 +++++ .../modules/__tests__/index/query-index.spec.ts | 2 +- .../modules/__tests__/index/search.spec.ts | 2 +- .../src/transaction/transaction-orchestrator.ts | 15 +++++---------- .../integration-tests/__tests__/index.spec.ts | 8 +++----- .../src/services/workflow-orchestrator.ts | 2 +- .../src/services/workflows-module.ts | 14 ++++++++------ .../src/services/workflow-orchestrator.ts | 2 +- .../src/services/workflows-module.ts | 14 ++++++++------ 9 files changed, 33 insertions(+), 31 deletions(-) create mode 100644 .changeset/three-beans-fold.md diff --git a/.changeset/three-beans-fold.md b/.changeset/three-beans-fold.md new file mode 100644 index 0000000000..a8361a9d50 --- /dev/null +++ b/.changeset/three-beans-fold.md @@ -0,0 +1,5 @@ +--- +"@medusajs/orchestration": patch +--- + +fix(orchestration): save checkpoint before async diff --git a/integration-tests/modules/__tests__/index/query-index.spec.ts b/integration-tests/modules/__tests__/index/query-index.spec.ts index f8badebb23..fa7a5fdcc1 100644 --- a/integration-tests/modules/__tests__/index/query-index.spec.ts +++ b/integration-tests/modules/__tests__/index/query-index.spec.ts @@ -97,7 +97,7 @@ medusaIntegrationTestRunner({ process.env.ENABLE_INDEX_MODULE = "false" }) - describe("Index engine - Query.index", () => { + describe.skip("Index engine - Query.index", () => { beforeEach(async () => { await createAdminUser(dbConnection, adminHeaders, appContainer) }) diff --git a/integration-tests/modules/__tests__/index/search.spec.ts b/integration-tests/modules/__tests__/index/search.spec.ts index c442b53aa5..deb67d4b27 100644 --- a/integration-tests/modules/__tests__/index/search.spec.ts +++ b/integration-tests/modules/__tests__/index/search.spec.ts @@ -30,7 +30,7 @@ medusaIntegrationTestRunner({ await createAdminUser(dbConnection, adminHeaders, appContainer) }) - describe("Index engine", () => { + describe.skip("Index engine", () => { it("should search through the indexed data and return the correct results ordered and filtered [1]", async () => { const shippingProfile = ( await api.post( diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 34be638ab3..f729bb0582 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -741,7 +741,6 @@ export class TransactionOrchestrator extends EventEmitter { this.emit(DistributedTransactionEvent.FINISH, { transaction }) } - const asyncStepsToStart: any[] = [] for (const step of nextSteps.next) { const curState = step.getStates() const type = step.isCompensating() @@ -924,8 +923,8 @@ export class TransactionOrchestrator extends EventEmitter { return await transaction.handler(...handlerArgs) } - asyncStepsToStart.push({ - handler: async () => { + execution.push( + transaction.saveCheckpoint().then(() => { let promise: Promise if (TransactionOrchestrator.traceStep) { @@ -937,7 +936,7 @@ export class TransactionOrchestrator extends EventEmitter { promise = stepHandler() } - return promise + promise .then(async (response: any) => { const output = response?.__type ? response.output : response @@ -991,8 +990,8 @@ export class TransactionOrchestrator extends EventEmitter { response, }) }) - }, - }) + }) + ) } } @@ -1006,10 +1005,6 @@ export class TransactionOrchestrator extends EventEmitter { } } - if (asyncStepsToStart.length > 0) { - execution.push(...asyncStepsToStart.map((step) => step.handler())) - } - await promiseAll(execution) if (nextSteps.next.length === 0) { diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index d8dcda6043..49b79ab3d7 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -15,6 +15,7 @@ import { import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { WorkflowsModuleService } from "@services" import { asFunction } from "awilix" +import { setTimeout as setTimeoutSync } from "timers" import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" import { @@ -28,7 +29,6 @@ import { workflowEventGroupIdStep1Mock, workflowEventGroupIdStep2Mock, } from "../__fixtures__/workflow_event_group_id" -import { setTimeout as setTimeoutSync } from "timers" import { createScheduled } from "../__fixtures__/workflow_scheduled" jest.setTimeout(300000) @@ -97,9 +97,9 @@ moduleIntegrationTestRunner({ await workflowOrcModule.run(eventGroupWorkflowId, { input: {}, + transactionId: "transaction_id", context: { eventGroupId, - transactionId: "transaction_id", }, throwOnError: true, }) @@ -126,9 +126,7 @@ moduleIntegrationTestRunner({ it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => { await workflowOrcModule.run(eventGroupWorkflowId, { input: {}, - context: { - transactionId: "transaction_id_2", - }, + transactionId: "transaction_id_2", throwOnError: true, }) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 483de93037..8d940b31f2 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -140,7 +140,7 @@ export class WorkflowOrchestratorService { let { throwOnError, context } = options ?? {} throwOnError ??= true context ??= {} - context.transactionId ??= transactionId ?? ulid() + context.transactionId = transactionId ?? ulid() const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index fe51828b17..4acd052f6d 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -83,24 +83,26 @@ export class WorkflowsModuleService< > = {}, @MedusaContext() context: Context = {} ) { - options ??= {} + const options_ = JSON.parse(JSON.stringify(options ?? {})) + const { manager, transactionManager, preventReleaseEvents, + transactionId, ...restContext } = context - options.context ??= restContext - options.context.preventReleaseEvents ??= - !!options.context.parentStepIdempotencyKey - delete options.context.parentStepIdempotencyKey + options_.context ??= restContext + options_.context.preventReleaseEvents ??= + !!options_.context.parentStepIdempotencyKey + delete options_.context.parentStepIdempotencyKey const ret = await this.workflowOrchestratorService_.run< TWorkflow extends ReturnWorkflow ? UnwrapWorkflowInputDataType : unknown - >(workflowIdOrWorkflow, options) + >(workflowIdOrWorkflow, options_) return ret as any } diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index eef069c029..aa0db54014 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -204,7 +204,7 @@ export class WorkflowOrchestratorService { throwOnError ??= true context ??= {} - context.transactionId ??= transactionId ?? ulid() + context.transactionId = transactionId ?? ulid() const workflowId = isString(workflowIdOrWorkflow) ? workflowIdOrWorkflow diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index aaba7a0f15..30dcc97d62 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -95,24 +95,26 @@ export class WorkflowsModuleService< > = {}, @MedusaContext() context: Context = {} ) { - options ??= {} + const options_ = JSON.parse(JSON.stringify(options ?? {})) + const { manager, transactionManager, preventReleaseEvents, + transactionId, ...restContext } = context - options.context ??= restContext - options.context.preventReleaseEvents ??= - !!options.context.parentStepIdempotencyKey - delete options.context.parentStepIdempotencyKey + options_.context ??= restContext + options_.context.preventReleaseEvents ??= + !!options_.context.parentStepIdempotencyKey + delete options_.context.parentStepIdempotencyKey const ret = await this.workflowOrchestratorService_.run< TWorkflow extends ReturnWorkflow ? UnwrapWorkflowInputDataType : unknown - >(workflowIdOrWorkflow, options) + >(workflowIdOrWorkflow, options_) return ret as any }