From ebf33bea43bb6fe03a60a3099b2bc62608f0d13b Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 15 Sep 2025 12:54:57 +0200 Subject: [PATCH] fix(): pipeline missing suites (#13457) * fix(): pipeline missing suites * fix tax integration tests * fix tax integration tests * fix pipeline * fix link integration tests * remove old tests and move current one * fix workflow execution integration tests * fix tests and orchestrator * Fix missing suites in pipeline Remove integration-tests-modules from patch list. --- .changeset/honest-books-fix.md | 7 + .github/workflows/action.yml | 11 +- .../modules/__tests__/link-modules/index.ts | 85 ++--- .../return/create-return-shipping.spec.ts | 2 - .../modules/__tests__/tax/admin/tax.spec.ts | 9 +- .../admin/workflow-executions.spec.ts | 323 ++++++++++++++++++ .../__tests__/workflow-engine/api_v2.ts | 7 - .../__tests__/workflow-engine/tests.ts | 274 --------------- ...flow-engine.ts => workflow-engine.spec.ts} | 0 .../transaction/transaction-orchestrator.ts | 7 + .../integration-tests/__tests__/index.spec.ts | 96 +++--- .../integration-tests/__tests__/index.spec.ts | 100 +++--- 12 files changed, 496 insertions(+), 425 deletions(-) create mode 100644 .changeset/honest-books-fix.md create mode 100644 integration-tests/modules/__tests__/workflow-engine/admin/workflow-executions.spec.ts delete mode 100644 integration-tests/modules/__tests__/workflow-engine/api_v2.ts delete mode 100644 integration-tests/modules/__tests__/workflow-engine/tests.ts rename integration-tests/modules/__tests__/workflow-engine/{workflow-engine.ts => workflow-engine.spec.ts} (100%) diff --git a/.changeset/honest-books-fix.md b/.changeset/honest-books-fix.md new file mode 100644 index 0000000000..31c66df257 --- /dev/null +++ b/.changeset/honest-books-fix.md @@ -0,0 +1,7 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +--- + +fix(): pipeline missing suites diff --git a/.github/workflows/action.yml b/.github/workflows/action.yml index eb98c80068..f04e3707ed 100644 --- a/.github/workflows/action.yml +++ b/.github/workflows/action.yml @@ -205,13 +205,22 @@ jobs: name: Module Integration Tests - Shard ${{ matrix.shard_index }} strategy: matrix: - shard_index: [1, 2] + shard_index: [1, 2, 3] runs-on: ubuntu-latest env: TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} TURBO_TEAM: ${{ secrets.TURBO_TEAM }} services: + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 1s + --health-timeout 10s + --health-retries 10 + ports: + - 6379:6379 postgres: image: postgres env: diff --git a/integration-tests/modules/__tests__/link-modules/index.ts b/integration-tests/modules/__tests__/link-modules/index.ts index 4eb5d71b0c..115fc6e4bf 100644 --- a/integration-tests/modules/__tests__/link-modules/index.ts +++ b/integration-tests/modules/__tests__/link-modules/index.ts @@ -103,47 +103,50 @@ medusaIntegrationTestRunner({ const values = await links.linkServiceName.list() - expect(values).toEqual([ - { - product_id: "modA_id", - inventory_item_id: "modB_id", - id: expect.stringMatching("prefix_.+"), - extra_field: -1, - another_field: null, - created_at: expect.any(Date), - updated_at: expect.any(Date), - deleted_at: null, - }, - expect.objectContaining({ - product_id: "123", - inventory_item_id: "abc", - id: expect.stringMatching("prefix_.+"), - extra_field: 333, - another_field: "value**", - }), - expect.objectContaining({ - product_id: "111", - inventory_item_id: "aaa", - extra_field: -1, - another_field: "test", - }), - expect.objectContaining({ - product_id: "222", - inventory_item_id: "bbb", - extra_field: -1, - another_field: null, - }), - expect.objectContaining({ - product_id: "333", - inventory_item_id: "ccc", - id: expect.stringMatching("prefix_.+"), - extra_field: 2, - }), - expect.objectContaining({ - product_id: "444", - inventory_item_id: "bbb", - }), - ]) + expect(values).toHaveLength(6) + expect(values).toEqual( + expect.arrayContaining([ + { + product_id: "modA_id", + inventory_item_id: "modB_id", + id: expect.stringMatching("prefix_.+"), + extra_field: -1, + another_field: null, + created_at: expect.any(Date), + updated_at: expect.any(Date), + deleted_at: null, + }, + expect.objectContaining({ + product_id: "123", + inventory_item_id: "abc", + id: expect.stringMatching("prefix_.+"), + extra_field: 333, + another_field: "value**", + }), + expect.objectContaining({ + product_id: "111", + inventory_item_id: "aaa", + extra_field: -1, + another_field: "test", + }), + expect.objectContaining({ + product_id: "222", + inventory_item_id: "bbb", + extra_field: -1, + another_field: null, + }), + expect.objectContaining({ + product_id: "333", + inventory_item_id: "ccc", + id: expect.stringMatching("prefix_.+"), + extra_field: 2, + }), + expect.objectContaining({ + product_id: "444", + inventory_item_id: "bbb", + }), + ]) + ) }) it("Should dismiss the link of a given pair of keys", async function () { diff --git a/integration-tests/modules/__tests__/order/workflows/return/create-return-shipping.spec.ts b/integration-tests/modules/__tests__/order/workflows/return/create-return-shipping.spec.ts index 5db55e8983..e0690b5e04 100644 --- a/integration-tests/modules/__tests__/order/workflows/return/create-return-shipping.spec.ts +++ b/integration-tests/modules/__tests__/order/workflows/return/create-return-shipping.spec.ts @@ -176,8 +176,6 @@ medusaIntegrationTestRunner({ }, }) - console.log(result.items[0].actions) - let updatedShippingMethod = result.shipping_methods?.find( (sm) => sm.shipping_option_id === shippingOptionId ) diff --git a/integration-tests/modules/__tests__/tax/admin/tax.spec.ts b/integration-tests/modules/__tests__/tax/admin/tax.spec.ts index 868eedf628..422aa920c8 100644 --- a/integration-tests/modules/__tests__/tax/admin/tax.spec.ts +++ b/integration-tests/modules/__tests__/tax/admin/tax.spec.ts @@ -109,6 +109,7 @@ medusaIntegrationTestRunner({ `/admin/tax-regions`, { country_code: "us", + provider_id: "tp_system", default_tax_rate: { code: "default", rate: 2, @@ -131,7 +132,7 @@ medusaIntegrationTestRunner({ updated_at: expect.any(String), deleted_at: null, created_by: expect.any(String), - provider_id: null, + provider_id: "tp_system", metadata: null, children: [], parent: null, @@ -275,6 +276,7 @@ medusaIntegrationTestRunner({ `/admin/tax-regions`, { country_code: "us", + provider_id: "tp_system", default_tax_rate: { code: "default", rate: 2, @@ -297,7 +299,7 @@ medusaIntegrationTestRunner({ updated_at: expect.any(String), deleted_at: null, created_by: expect.any(String), - provider_id: null, + provider_id: "tp_system", metadata: null, children: [], parent: null, @@ -385,6 +387,7 @@ medusaIntegrationTestRunner({ `/admin/tax-regions`, { country_code: "us", + provider_id: "tp_system", default_tax_rate: { code: "default", rate: 2, @@ -495,6 +498,7 @@ medusaIntegrationTestRunner({ `/admin/tax-regions`, { country_code: "us", + provider_id: "tp_system", default_tax_rate: { code: "default", rate: 2, @@ -531,6 +535,7 @@ medusaIntegrationTestRunner({ `/admin/tax-regions`, { country_code: "us", + provider_id: "tp_system", default_tax_rate: { code: "default", rate: 2, diff --git a/integration-tests/modules/__tests__/workflow-engine/admin/workflow-executions.spec.ts b/integration-tests/modules/__tests__/workflow-engine/admin/workflow-executions.spec.ts new file mode 100644 index 0000000000..e93af3593b --- /dev/null +++ b/integration-tests/modules/__tests__/workflow-engine/admin/workflow-executions.spec.ts @@ -0,0 +1,323 @@ +import { + createStep, + createWorkflow, + StepResponse, + WorkflowData, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" +import { medusaIntegrationTestRunner } from "@medusajs/test-utils" +import { createAdminUser } from "../../../../helpers/create-admin-user" +import { setTimeout } from "timers/promises" + +jest.setTimeout(100000) + +/** + * finish the workflow to prevent the runner from waiting for the end of the execution permanently + */ +async function finishWorkflows( + api: any, + { + stepId, + response, + executions, + }: { + stepId: string + response?: any + executions: { transaction_id: string }[] + } +) { + const promises: any[] = [] + for (const execution of executions) { + promises.push( + api.post( + `/admin/workflows-executions/my-workflow-name/steps/success`, + { + transaction_id: execution.transaction_id, + step_id: stepId, + response: response || { + all: "good", + }, + }, + adminHeaders + ) + ) + } + + await Promise.all(promises) +} + +const adminHeaders = { + headers: { + "x-medusa-access-token": "test_token", + }, +} + +medusaIntegrationTestRunner({ + testSuite: ({ dbConnection, getContainer, api }) => { + describe("Workflow Engine API", () => { + let medusaContainer + + beforeAll(() => { + medusaContainer = getContainer() + + const step1 = createStep( + { + name: "my-step", + }, + async (input: { initial: string }) => { + return new StepResponse({ + result: input.initial, + }) + } + ) + const step2 = createStep( + { + name: "my-step-async", + async: true, + }, + async () => {} + ) + + createWorkflow( + { + name: "my-workflow-name", + retentionTime: 1000, + }, + function (input: WorkflowData<{ initial: string }>) { + step1(input) + const res = step2() + return new WorkflowResponse(res) + } + ) + }) + + beforeEach(async () => { + await createAdminUser(dbConnection, adminHeaders, medusaContainer) + }) + + it("Should list all workflows in execution or completed and retrieve them by id", async () => { + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows-executions`, + adminHeaders + ) + + expect(executions.data.count).toEqual(3) + expect(executions.data.workflow_executions.length).toEqual(3) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.any(String), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + + const retrivedById = await api.get( + `/admin/workflows-executions/` + + executions.data.workflow_executions[0].id, + adminHeaders + ) + + expect(retrivedById.data.workflow_execution).toEqual( + expect.objectContaining(executions.data.workflow_executions[0]) + ) + + await finishWorkflows(api, { + stepId: "my-step-async", + executions: executions.data.workflow_executions, + }) + }) + + it("Should list all workflows matching the filters", async () => { + const promises: any[] = [] + const transactions: string[] = [] + + for (let i = 3; i--; ) { + const transactionId = "transaction_" + (i + 1) + transactions.push(transactionId) + promises.push( + api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: transactionId, + }, + adminHeaders + ) + ) + } + + await Promise.all(promises) + + const executions = await api.get( + `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, + adminHeaders + ) + + expect(executions.data.count).toEqual(2) + expect(executions.data.workflow_executions.length).toEqual(2) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching( + new RegExp(transactions.join("|")) + ), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + expect(executions.data.workflow_executions[1]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching( + new RegExp(transactions.join("|")) + ), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + + await finishWorkflows(api, { + stepId: "my-step-async", + executions: transactions.map((transactionId) => ({ + transaction_id: transactionId, + })), + }) + + console.log("finished") + }) + + it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { + const wf = await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "trx_123", + }, + adminHeaders + ) + + expect(wf.data).toEqual({ + acknowledgement: { + transactionId: "trx_123", + workflowId: "my-workflow-name", + hasFailed: false, + hasFinished: false, + }, + }) + + await setTimeout(2000) + + const execution = await api.get( + `/admin/workflows-executions/my-workflow-name/${wf.data.acknowledgement.transactionId}`, + adminHeaders + ) + + expect(execution.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + id: expect.any(String), + state: "invoking", + execution: expect.objectContaining({ + hasAsyncSteps: true, + hasFailedSteps: false, + hasSkippedSteps: false, + hasWaitingSteps: true, + hasRevertedSteps: false, + }), + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: { + "my-step": { + __type: "Symbol(WorkflowWorkflowData)", + output: { + __type: "Symbol(WorkflowStepResponse)", + output: { + result: "abc", + }, + compensateInput: { + result: "abc", + }, + }, + }, + }, + payload: { + initial: "abc", + }, + }), + }), + }), + }) + + const respondAsync = await api.post( + `/admin/workflows-executions/my-workflow-name/steps/success`, + { + transaction_id: "trx_123", + step_id: "my-step-async", + response: { + all: "good", + }, + }, + adminHeaders + ) + + expect(respondAsync.data.success).toEqual(true) + + const completed = await api.get( + `/admin/workflows-executions/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(completed.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + state: "done", + execution: expect.objectContaining({ + hasAsyncSteps: true, + hasFailedSteps: false, + hasSkippedSteps: false, + hasWaitingSteps: false, + hasRevertedSteps: false, + }), + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: expect.objectContaining({ + "my-step-async": { + __type: "Symbol(WorkflowStepResponse)", + output: { + all: "good", + }, + compensateInput: { + all: "good", + }, + }, + }), + }), + }), + }), + }) + }) + }) + }, +}) diff --git a/integration-tests/modules/__tests__/workflow-engine/api_v2.ts b/integration-tests/modules/__tests__/workflow-engine/api_v2.ts deleted file mode 100644 index 49ac9b3df0..0000000000 --- a/integration-tests/modules/__tests__/workflow-engine/api_v2.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { workflowEngineTestSuite } from "./tests" - -jest.setTimeout(5000000) - -const env = {} - -workflowEngineTestSuite(env) diff --git a/integration-tests/modules/__tests__/workflow-engine/tests.ts b/integration-tests/modules/__tests__/workflow-engine/tests.ts deleted file mode 100644 index 12e3c87275..0000000000 --- a/integration-tests/modules/__tests__/workflow-engine/tests.ts +++ /dev/null @@ -1,274 +0,0 @@ -import { - createStep, - createWorkflow, - StepResponse, - WorkflowData, -} from "@medusajs/framework/workflows-sdk" -import { medusaIntegrationTestRunner } from "@medusajs/test-utils" -import { createAdminUser } from "../../../helpers/create-admin-user" - -export const workflowEngineTestSuite = ( - env, - extraParams: { force_modules_migration?: boolean } = {} -) => { - const adminHeaders = { - headers: { - "x-medusa-access-token": "test_token", - }, - } - - return medusaIntegrationTestRunner({ - env, - force_modules_migration: extraParams.force_modules_migration, - testSuite: ({ dbConnection, getContainer, api }) => { - describe("Workflow Engine API", () => { - let medusaContainer - - beforeAll(() => { - medusaContainer = getContainer() - }) - - beforeEach(async () => { - await createAdminUser(dbConnection, adminHeaders, medusaContainer) - }) - - describe("running workflows", () => { - beforeAll(async () => { - const step1 = createStep( - { - name: "my-step", - }, - async (input: { initial: string }) => { - return new StepResponse({ - result: input.initial, - }) - } - ) - const step2 = createStep( - { - name: "my-step-async", - async: true, - }, - async () => {} - ) - - createWorkflow( - { - name: "my-workflow-name", - retentionTime: 1000, - }, - function (input: WorkflowData<{ initial: string }>) { - step1(input) - return step2() - } - ) - }) - - it("Should list all workflows in execution or completed and retrieve them by id", async () => { - for (let i = 3; i--; ) { - await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - }, - adminHeaders - ) - } - - const executions = await api.get( - `/admin/workflows-executions`, - adminHeaders - ) - - expect(executions.data.count).toEqual(3) - expect(executions.data.workflow_executions.length).toEqual(3) - expect(executions.data.workflow_executions[0]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.any(String), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - - const retrivedById = await api.get( - `/admin/workflows-executions/` + - executions.data.workflow_executions[0].id, - adminHeaders - ) - expect(retrivedById.data.workflow_execution).toEqual( - expect.objectContaining(executions.data.workflow_executions[0]) - ) - }) - - it("Should list all workflows matching the filters", async () => { - for (let i = 3; i--; ) { - await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - transaction_id: "transaction_" + (i + 1), - }, - adminHeaders - ) - } - - const executions = await api.get( - `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, - adminHeaders - ) - - expect(executions.data.count).toEqual(2) - expect(executions.data.workflow_executions.length).toEqual(2) - expect(executions.data.workflow_executions[0]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.stringMatching( - /transaction_1|transaction_2/ - ), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - expect(executions.data.workflow_executions[1]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.stringMatching( - /transaction_1|transaction_2/ - ), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - }) - - it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { - const wf = await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - transaction_id: "trx_123", - }, - adminHeaders - ) - - expect(wf.data).toEqual({ - acknowledgement: { - transactionId: "trx_123", - workflowId: "my-workflow-name", - hasFailed: false, - hasFinished: false, - }, - }) - - const execution = await api.get( - `/admin/workflows-executions/my-workflow-name/trx_123`, - adminHeaders - ) - - expect(execution.data).toEqual({ - workflow_execution: expect.objectContaining({ - workflow_id: "my-workflow-name", - transaction_id: "trx_123", - id: expect.any(String), - state: "invoking", - execution: expect.objectContaining({ - hasAsyncSteps: true, - hasFailedSteps: false, - hasSkippedSteps: false, - hasWaitingSteps: true, - hasRevertedSteps: false, - }), - context: expect.objectContaining({ - data: expect.objectContaining({ - invoke: { - "my-step": { - __type: "Symbol(WorkflowWorkflowData)", - output: { - __type: "Symbol(WorkflowStepResponse)", - output: { - result: "abc", - }, - compensateInput: { - result: "abc", - }, - }, - }, - }, - payload: { - initial: "abc", - }, - }), - }), - }), - }) - - const respondAsync = await api.post( - `/admin/workflows-executions/my-workflow-name/steps/success`, - { - transaction_id: "trx_123", - step_id: "my-step-async", - response: { - all: "good", - }, - }, - adminHeaders - ) - - expect(respondAsync.data.success).toEqual(true) - - const completed = await api.get( - `/admin/workflows-executions/my-workflow-name/trx_123`, - adminHeaders - ) - - expect(completed.data).toEqual({ - workflow_execution: expect.objectContaining({ - workflow_id: "my-workflow-name", - transaction_id: "trx_123", - state: "done", - execution: expect.objectContaining({ - hasAsyncSteps: true, - hasFailedSteps: false, - hasSkippedSteps: false, - hasWaitingSteps: false, - hasRevertedSteps: false, - }), - context: expect.objectContaining({ - data: expect.objectContaining({ - invoke: expect.objectContaining({ - "my-step-async": { - __type: "Symbol(WorkflowStepResponse)", - output: { - all: "good", - }, - compensateInput: { - all: "good", - }, - }, - }), - }), - }), - }), - }) - }) - }) - }) - }, - }) -} - -describe("Noop test", () => { - it("noop check", async () => { - expect(true).toBe(true) - }) -}) diff --git a/integration-tests/modules/__tests__/workflow-engine/workflow-engine.ts b/integration-tests/modules/__tests__/workflow-engine/workflow-engine.spec.ts similarity index 100% rename from integration-tests/modules/__tests__/workflow-engine/workflow-engine.ts rename to integration-tests/modules/__tests__/workflow-engine/workflow-engine.spec.ts diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 9f346afa4f..8ddfc2eeb5 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -961,6 +961,13 @@ export class TransactionOrchestrator extends EventEmitter { if (nextSteps.next.length === 0 || (hasAsyncSteps && !execution.length)) { continueExecution = false + await transaction.saveCheckpoint().catch((error) => { + if (TransactionOrchestrator.isExpectedError(error)) { + return + } + + throw error + }) } } } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 32e7d22c76..8543e93c26 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -447,63 +447,61 @@ moduleIntegrationTestRunner({ ) }) - it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => { - const transactionId = "transaction-auto-retries" + ulid() - const workflowId = "workflow_1_auto_retries_false" + it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", (done) => { + ;(async () => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries_false" - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) - let lastExepectHaveBeenCalledTimes = 0 + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(3) + expect( + step1CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + expect( + step2CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + done() + } + }, + }) - workflowOrcModule.subscribe({ - workflowId, - transactionId, - subscriber: async (event) => { - if (event.eventType === "onFinish") { - lastExepectHaveBeenCalledTimes = 1 - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(3) - expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes( - 1 - ) - expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes( - 1 - ) - } - }, - }) + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + await setTimeoutPromise(2000) - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) - await setTimeoutPromise(1000) + await setTimeoutPromise(2000) - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) - expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) - - await setTimeoutPromise(1000) - - expect(lastExepectHaveBeenCalledTimes).toEqual(1) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + })() }) it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => { diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index dbc441d4a0..723e8dd384 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -485,63 +485,65 @@ moduleIntegrationTestRunner({ ) }) - it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", async () => { - const transactionId = "transaction-auto-retries" + ulid() - const workflowId = "workflow_1_auto_retries_false" + it("should not retry steps X times automatically when maxRetries is set and autoRetry is false", (done) => { + ;(async () => { + const transactionId = "transaction-auto-retries" + ulid() + const workflowId = "workflow_1_auto_retries_false" - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) - let lastExepectHaveBeenCalledTimes = 0 + workflowOrcModule.subscribe({ + workflowId, + transactionId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes( + 1 + ) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes( + 3 + ) + expect( + step1CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + expect( + step2CompensateMockAutoRetriesFalse + ).toHaveBeenCalledTimes(1) + done() + } + }, + }) - workflowOrcModule.subscribe({ - workflowId, - transactionId, - subscriber: async (event) => { - if (event.eventType === "onFinish") { - lastExepectHaveBeenCalledTimes = 1 - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(3) - expect( - step1CompensateMockAutoRetriesFalse - ).toHaveBeenCalledTimes(1) - expect( - step2CompensateMockAutoRetriesFalse - ).toHaveBeenCalledTimes(1) - } - }, - }) + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + await setTimeout(2000) - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) - await setTimeout(4000) + await setTimeout(2000) - expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) - expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) - expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step1InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(1) + expect(step2InvokeMockAutoRetriesFalse).toHaveBeenCalledTimes(2) + expect(step1CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) + expect(step2CompensateMockAutoRetriesFalse).toHaveBeenCalledTimes(0) - await workflowOrcModule.run(workflowId, { - input: {}, - transactionId, - throwOnError: false, - }) - - await setTimeout(4000) - - expect(lastExepectHaveBeenCalledTimes).toEqual(1) + await workflowOrcModule.run(workflowId, { + input: {}, + transactionId, + throwOnError: false, + }) + })() }) it("should prevent executing twice the same workflow in perfect concurrency with the same transactionId and non idempotent and not async but retention time is set", async () => {