diff --git a/packages/core/core-flows/src/order/workflows/cancel-order.ts b/packages/core/core-flows/src/order/workflows/cancel-order.ts index 3ee16356ab..6396877026 100644 --- a/packages/core/core-flows/src/order/workflows/cancel-order.ts +++ b/packages/core/core-flows/src/order/workflows/cancel-order.ts @@ -71,17 +71,6 @@ const validateOrder = createStep( const notCanceled = (o) => !o.canceled_at throwErrorIf(order_.fulfillments, notCanceled, "fulfillments") - /* - TODO: relationship between order and returns, swaps, claims - - throwErrorIf( - order_.returns, - (ret) => ret.status !== "canceled", - "returns" - ) - throwErrorIf(order_.swaps, notCanceled, "swaps") - throwErrorIf(order_.claims, notCanceled, "claims") - */ } ) diff --git a/packages/core/modules-sdk/src/medusa-app.ts b/packages/core/modules-sdk/src/medusa-app.ts index 04fee33aa2..c10487eb3a 100644 --- a/packages/core/modules-sdk/src/medusa-app.ts +++ b/packages/core/modules-sdk/src/medusa-app.ts @@ -226,6 +226,7 @@ export type MedusaAppOutput = { revertMigrations: RunMigrationFn onApplicationShutdown: () => Promise onApplicationPrepareShutdown: () => Promise + sharedContainer?: MedusaContainer } export type MedusaAppOptions = { @@ -470,6 +471,7 @@ async function MedusaApp_({ notFound, runMigrations, revertMigrations, + sharedContainer: sharedContainer_, } } diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index cd5addce28..5b5048909c 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -498,10 +498,10 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.transactionId).toBe("transaction_id_123") - expect(mocks.one).toBeCalledTimes(1) - expect(mocks.two).toBeCalledTimes(4) + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(4) expect(transaction.getState()).toBe(TransactionState.REVERTED) - expect(mocks.compensateOne).toBeCalledTimes(1) + expect(mocks.compensateOne).toHaveBeenCalledTimes(1) expect(mocks.two).nthCalledWith( 1, @@ -561,7 +561,7 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) - expect(mocks.one).toBeCalledTimes(2) + expect(mocks.one).toHaveBeenCalledTimes(2) expect(transaction.getState()).toBe(TransactionState.FAILED) }) @@ -617,8 +617,8 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.transactionId).toBe("transaction_id_123") - expect(mocks.one).toBeCalledTimes(1) - expect(mocks.two).toBeCalledTimes(2) + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(2) expect(transaction.getState()).toBe(TransactionState.DONE) expect(transaction.isPartiallyCompleted).toBe(true) }) @@ -677,8 +677,8 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) - expect(mocks.one).toBeCalledTimes(1) - expect(mocks.two).toBeCalledTimes(0) + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(0) expect(transaction.getState()).toBe(TransactionState.INVOKING) expect(transaction.getFlow().hasWaitingSteps).toBe(true) @@ -771,9 +771,9 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) - expect(mocks.one).toBeCalledTimes(1) - expect(mocks.compensateOne).toBeCalledTimes(0) - expect(mocks.two).toBeCalledTimes(0) + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.compensateOne).toHaveBeenCalledTimes(0) + expect(mocks.two).toHaveBeenCalledTimes(0) const registerBeforeAllowed = await strategy .registerStepSuccess(mockSecondStepId, handler) @@ -791,7 +791,7 @@ describe("Transaction Orchestrator", () => { ) expect(resumedTransaction.getState()).toBe(TransactionState.COMPENSATING) - expect(mocks.compensateOne).toBeCalledTimes(1) + expect(mocks.compensateOne).toHaveBeenCalledTimes(1) const mocktransactionIdCompensate = TransactionOrchestrator.getKeyName( "transaction-name", @@ -881,11 +881,11 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) - expect(mocks.one).toBeCalledTimes(1) - expect(mocks.compensateOne).toBeCalledTimes(1) - expect(mocks.two).toBeCalledTimes(1) - expect(mocks.compensateTwo).toBeCalledTimes(1) - expect(mocks.three).toBeCalledTimes(1) + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.compensateOne).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(1) + expect(mocks.compensateTwo).toHaveBeenCalledTimes(1) + expect(mocks.three).toHaveBeenCalledTimes(1) expect(transaction.getState()).toBe(TransactionState.REVERTED) }) @@ -951,16 +951,16 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.getState()).toBe(TransactionState.DONE) - expect(mocks.one).toBeCalledTimes(1) - expect(mocks.two).toBeCalledTimes(1) + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(1) await strategy.cancelTransaction(transaction) expect(transaction.getState()).toBe(TransactionState.REVERTED) - expect(mocks.one).toBeCalledTimes(1) - expect(mocks.two).toBeCalledTimes(1) - expect(mocks.oneCompensate).toBeCalledTimes(1) - expect(mocks.twoCompensate).toBeCalledTimes(1) + expect(mocks.one).toHaveBeenCalledTimes(1) + expect(mocks.two).toHaveBeenCalledTimes(1) + expect(mocks.oneCompensate).toHaveBeenCalledTimes(1) + expect(mocks.twoCompensate).toHaveBeenCalledTimes(1) }) it("Should receive the current transaction as reference in the handler", async () => { @@ -1081,10 +1081,10 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.transactionId).toBe("transaction_id_123") - expect(mocks.f1).toBeCalledTimes(2) - expect(mocks.f2).toBeCalledTimes(2) - expect(mocks.f3).toBeCalledTimes(2) - expect(mocks.f4).toBeCalledTimes(0) + expect(mocks.f1).toHaveBeenCalledTimes(2) + expect(mocks.f2).toHaveBeenCalledTimes(2) + expect(mocks.f3).toHaveBeenCalledTimes(2) + expect(mocks.f4).toHaveBeenCalledTimes(0) expect(transaction.getContext().invoke.action1).toBe("content f1") expect(transaction.getContext().invoke.action2).toBe("delayed content f2") expect(transaction.getContext().invoke.action3).toBe("content f3") @@ -1187,10 +1187,10 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.transactionId).toBe("transaction_id_123") - expect(mocks.f1).toBeCalledTimes(1) - expect(mocks.f2).toBeCalledTimes(1) - expect(mocks.f3).toBeCalledTimes(1) - expect(mocks.f4).toBeCalledTimes(0) + expect(mocks.f1).toHaveBeenCalledTimes(1) + expect(mocks.f2).toHaveBeenCalledTimes(1) + expect(mocks.f3).toHaveBeenCalledTimes(1) + expect(mocks.f4).toHaveBeenCalledTimes(0) expect(transaction.getContext().invoke.action1).toBe("content f1") expect(transaction.getContext().invoke.action2).toBe("delayed content f2") expect(transaction.getContext().invoke.action3).toBe("content f3") @@ -1304,10 +1304,10 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.transactionId).toBe("transaction_id_123") - expect(mocks.f1).toBeCalledTimes(2) - expect(mocks.f2).toBeCalledTimes(2) - expect(mocks.f3).toBeCalledTimes(2) - expect(mocks.f4).toBeCalledTimes(0) + expect(mocks.f1).toHaveBeenCalledTimes(2) + expect(mocks.f2).toHaveBeenCalledTimes(2) + expect(mocks.f3).toHaveBeenCalledTimes(2) + expect(mocks.f4).toHaveBeenCalledTimes(0) expect(transaction.getContext().invoke.action1).toBe("content f1") expect(transaction.getContext().invoke.action2).toBe("delayed content f2") expect(transaction.getContext().invoke.action3).toBe("content f3") @@ -1413,10 +1413,10 @@ describe("Transaction Orchestrator", () => { await strategy.resume(transaction) expect(transaction.transactionId).toBe("transaction_id_123") - expect(mocks.f1).toBeCalledTimes(2) - expect(mocks.f2).toBeCalledTimes(2) - expect(mocks.f3).toBeCalledTimes(2) - expect(mocks.f4).toBeCalledTimes(0) + expect(mocks.f1).toHaveBeenCalledTimes(2) + expect(mocks.f2).toHaveBeenCalledTimes(2) + expect(mocks.f3).toHaveBeenCalledTimes(2) + expect(mocks.f4).toHaveBeenCalledTimes(0) expect(transaction.getContext().invoke.action1).toBe("content f1") expect(transaction.getContext().invoke.action2).toBe("delayed content f2") expect(transaction.getContext().invoke.action3).toBe("content f3") diff --git a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts index de8473febe..1649efa4e2 100644 --- a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts +++ b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts @@ -3,7 +3,7 @@ import { TransactionCheckpoint, } from "../distributed-transaction" import { TransactionStep } from "../transaction-step" -import { SchedulerOptions } from "../types" +import { SchedulerOptions, TransactionOptions } from "../types" export interface IDistributedSchedulerStorage { schedule( @@ -17,9 +17,17 @@ export interface IDistributedSchedulerStorage { } export interface IDistributedTransactionStorage { - get(key: string): Promise + get( + key: string, + options?: TransactionOptions + ): Promise list(): Promise - save(key: string, data: TransactionCheckpoint, ttl?: number): Promise + save( + key: string, + data: TransactionCheckpoint, + ttl?: number, + options?: TransactionOptions + ): Promise scheduleRetry( transaction: DistributedTransaction, step: TransactionStep, diff --git a/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts b/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts index 23ac1438cb..b6745873ac 100644 --- a/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts +++ b/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts @@ -1,5 +1,6 @@ import { TransactionState } from "@medusajs/utils" import { TransactionCheckpoint } from "../distributed-transaction" +import { TransactionOptions } from "../types" import { DistributedTransactionStorage } from "./abstract-storage" // eslint-disable-next-line max-len @@ -11,7 +12,10 @@ export class BaseInMemoryDistributedTransactionStorage extends DistributedTransa this.storage = new Map() } - async get(key: string): Promise { + async get( + key: string, + options?: TransactionOptions + ): Promise { return this.storage.get(key) } @@ -22,7 +26,8 @@ export class BaseInMemoryDistributedTransactionStorage extends DistributedTransa async save( key: string, data: TransactionCheckpoint, - ttl?: number + ttl?: number, + options?: TransactionOptions ): Promise { const hasFinished = [ TransactionState.DONE, diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index 66b45de671..a38a84f67d 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -7,11 +7,7 @@ import { TransactionOrchestrator, } from "./transaction-orchestrator" import { TransactionStep, TransactionStepHandler } from "./transaction-step" -import { - SchedulerOptions, - TransactionHandlerType, - TransactionState, -} from "./types" +import { TransactionHandlerType, TransactionState } from "./types" /** * @typedef TransactionMetadata @@ -90,7 +86,7 @@ export class DistributedTransaction extends EventEmitter { this.keyValueStore = storage } - public static keyPrefix = "dtrans" + public static keyPrefix = "dtrx" constructor( private flow: TransactionFlow, @@ -191,7 +187,10 @@ export class DistributedTransaction extends EventEmitter { public async saveCheckpoint( ttl = 0 ): Promise { - const options = this.getFlow().options + const options = + TransactionOrchestrator.getWorkflowOptions(this.modelId) ?? + this.getFlow().options + if (!options?.store) { return } @@ -207,7 +206,7 @@ export class DistributedTransaction extends EventEmitter { this.modelId, this.transactionId ) - await DistributedTransaction.keyValueStore.save(key, data, ttl) + await DistributedTransaction.keyValueStore.save(key, data, ttl, options) return data } @@ -222,7 +221,11 @@ export class DistributedTransaction extends EventEmitter { transactionId ) - const loadedData = await DistributedTransaction.keyValueStore.get(key) + const options = TransactionOrchestrator.getWorkflowOptions(modelId) + const loadedData = await DistributedTransaction.keyValueStore.get( + key, + options + ) if (loadedData) { return loadedData } diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 0dd34002cf..71ab04ac3b 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -6,8 +6,10 @@ import { import { TransactionStep, TransactionStepHandler } from "./transaction-step" import { DistributedTransactionEvent, + StepFeatures, TransactionHandlerType, TransactionModelOptions, + TransactionOptions, TransactionState, TransactionStepsDefinition, TransactionStepStatus, @@ -56,12 +58,22 @@ export class TransactionOrchestrator extends EventEmitter { private compensateSteps: string[] = [] public static DEFAULT_RETRIES = 0 + + private static workflowOptions: { + [modelId: string]: TransactionOptions + } = {} + + public static getWorkflowOptions(modelId: string): TransactionOptions { + return this.workflowOptions[modelId] + } + constructor( public id: string, private definition: TransactionStepsDefinition, private options?: TransactionModelOptions ) { super() + this.parseFlowOptions() } private static SEPARATOR = ":" @@ -409,6 +421,7 @@ export class TransactionOrchestrator extends EventEmitter { } const flow = transaction.getFlow() + const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) if (!hasStepTimedOut) { step.changeStatus(TransactionStepStatus.OK) @@ -420,7 +433,7 @@ export class TransactionOrchestrator extends EventEmitter { step.changeState(TransactionStepState.DONE) } - if (step.definition.async || flow.options?.storeExecution) { + if (step.definition.async || options?.storeExecution) { await transaction.saveCheckpoint() } @@ -497,6 +510,8 @@ export class TransactionOrchestrator extends EventEmitter { } const flow = transaction.getFlow() + const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) + const cleaningUp: Promise[] = [] const hasTimedOut = step.getStates().state === TransactionStepState.TIMEOUT @@ -536,7 +551,7 @@ export class TransactionOrchestrator extends EventEmitter { } } - if (step.definition.async || flow.options?.storeExecution) { + if (step.definition.async || options?.storeExecution) { await transaction.saveCheckpoint() } @@ -563,6 +578,7 @@ export class TransactionOrchestrator extends EventEmitter { } const flow = transaction.getFlow() + const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) const nextSteps = await this.checkAllSteps(transaction) const execution: Promise[] = [] @@ -764,7 +780,7 @@ export class TransactionOrchestrator extends EventEmitter { } } - if (hasSyncSteps && flow.options?.storeExecution) { + if (hasSyncSteps && options?.storeExecution) { await transaction.saveCheckpoint() } @@ -798,7 +814,7 @@ export class TransactionOrchestrator extends EventEmitter { flow.state = TransactionState.INVOKING flow.startedAt = Date.now() - if (this.options?.store) { + if (this.getOptions().store) { await transaction.saveCheckpoint( flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL ) @@ -843,7 +859,7 @@ export class TransactionOrchestrator extends EventEmitter { await this.executeNext(transaction) } - private createTransactionFlow(transactionId: string, flowMetadata?: TransactionFlow['metadata']): TransactionFlow { + private parseFlowOptions() { const [steps, features] = TransactionOrchestrator.buildSteps( this.definition ) @@ -854,22 +870,47 @@ export class TransactionOrchestrator extends EventEmitter { const hasStepTimeouts = features.hasStepTimeouts const hasRetriesTimeout = features.hasRetriesTimeout const hasTransactionTimeout = !!this.options.timeout + const isIdempotent = !!this.options.idempotent if (hasAsyncSteps) { this.options.store = true } - if (hasStepTimeouts || hasRetriesTimeout || hasTransactionTimeout) { + if ( + hasStepTimeouts || + hasRetriesTimeout || + hasTransactionTimeout || + isIdempotent + ) { this.options.store = true this.options.storeExecution = true } + const parsedOptions = { + ...this.options, + hasAsyncSteps, + hasStepTimeouts, + hasRetriesTimeout, + } + TransactionOrchestrator.workflowOptions[this.id] = parsedOptions + + return [steps, features] + } + + private createTransactionFlow( + transactionId: string, + flowMetadata?: TransactionFlow["metadata"] + ): TransactionFlow { + const [steps, features] = TransactionOrchestrator.buildSteps( + this.definition + ) + const flow: TransactionFlow = { modelId: this.id, options: this.options, transactionId: transactionId, metadata: flowMetadata, - hasAsyncSteps, + hasAsyncSteps: features.hasAsyncSteps, hasFailedSteps: false, hasSkippedSteps: false, hasWaitingSteps: false, @@ -909,14 +950,7 @@ export class TransactionOrchestrator extends EventEmitter { private static buildSteps( flow: TransactionStepsDefinition, existingSteps?: { [key: string]: TransactionStep } - ): [ - { [key: string]: TransactionStep }, - { - hasAsyncSteps: boolean - hasStepTimeouts: boolean - hasRetriesTimeout: boolean - } - ] { + ): [{ [key: string]: TransactionStep }, StepFeatures] { const states: { [key: string]: TransactionStep } = { [TransactionOrchestrator.ROOT_STEP]: { id: TransactionOrchestrator.ROOT_STEP, @@ -938,12 +972,7 @@ export class TransactionOrchestrator extends EventEmitter { while (queue.length > 0) { const { obj, level } = queue.shift() - for (const key in obj) { - // eslint-disable-next-line no-prototype-builtins - if (!obj.hasOwnProperty(key)) { - continue - } - + for (const key of Object.keys(obj)) { if (typeof obj[key] === "object" && obj[key] !== null) { queue.push({ obj: obj[key], level: [...level] }) } else if (key === "action") { @@ -1039,7 +1068,11 @@ export class TransactionOrchestrator extends EventEmitter { existingTransaction?.context ) - if (newTransaction && this.options?.store && this.options?.storeExecution) { + if ( + newTransaction && + this.getOptions().store && + this.getOptions().storeExecution + ) { await transaction.saveCheckpoint( modelFlow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL ) diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index 1e744045d6..7fae4466e1 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -118,6 +118,11 @@ export type TransactionModelOptions = { */ storeExecution?: boolean + /** + * If true, the workflow will use the transaction ID as the key to ensure only-once execution + */ + idempotent?: boolean + /** * Defines the workflow as a scheduled workflow that executes based on the cron configuration passed. * The value can either by a cron expression string, or an object that also allows to define the concurrency behavior. @@ -207,3 +212,11 @@ export type DistributedTransactionEvents = { transaction: DistributedTransaction }) => void } + +export type StepFeatures = { + hasAsyncSteps: boolean + hasStepTimeouts: boolean + hasRetriesTimeout: boolean +} + +export type TransactionOptions = TransactionModelOptions & StepFeatures diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts index c5e62ad463..ee3382b3c3 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/index.ts @@ -1,5 +1,6 @@ export * from "./workflow_1" export * from "./workflow_2" export * from "./workflow_async" +export * from "./workflow_idempotent" export * from "./workflow_step_timeout" export * from "./workflow_transaction_timeout" diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts new file mode 100644 index 0000000000..9a1bcb16a3 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_idempotent.ts @@ -0,0 +1,68 @@ +import { + StepResponse, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" + +const step_1 = createStep( + "step_1", + jest.fn((input) => { + input.test = "test" + return new StepResponse(input, { compensate: 123 }) + }), + jest.fn((compensateInput) => { + if (!compensateInput) { + return + } + + console.log("reverted", compensateInput.compensate) + return new StepResponse({ + reverted: true, + }) + }) +) + +const step_2 = createStep( + "step_2", + jest.fn((input, context) => { + if (input) { + return new StepResponse({ notAsyncResponse: input.hey }) + } + }), + jest.fn((_, context) => { + return new StepResponse({ + step: context.metadata.action, + idempotency_key: context.metadata.idempotency_key, + reverted: true, + }) + }) +) + +const step_3 = createStep( + "step_3", + jest.fn((res) => { + return new StepResponse({ + done: { + inputFromSyncStep: res.notAsyncResponse, + }, + }) + }) +) + +createWorkflow( + { + name: "workflow_idempotent", + idempotent: true, + }, + function (input) { + step_1(input) + + const ret2 = step_2({ hey: "oh" }) + + step_2({ hey: "hello" }).config({ + name: "new_step_name", + }) + + return step_3(ret2) + } +) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index fff797151f..ba66c60beb 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -5,6 +5,7 @@ import { RemoteQueryFunction, } from "@medusajs/types" import { Modules, TransactionHandlerType } from "@medusajs/utils" +import { moduleIntegrationTestRunner } from "medusa-test-utils" import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__" @@ -14,7 +15,6 @@ import { workflowEventGroupIdStep2Mock, } from "../__fixtures__/workflow_event_group_id" import { createScheduled } from "../__fixtures__/workflow_scheduled" -import { moduleIntegrationTestRunner } from "medusa-test-utils" jest.setTimeout(100000) @@ -235,6 +235,10 @@ moduleIntegrationTestRunner({ }) describe("Scheduled workflows", () => { + beforeEach(() => { + jest.clearAllMocks() + }) + beforeAll(() => { jest.useFakeTimers() jest.spyOn(global, "setTimeout") @@ -247,8 +251,6 @@ moduleIntegrationTestRunner({ it("should execute a scheduled workflow", async () => { const spy = createScheduled("standard") - jest.clearAllMocks() - await jest.runOnlyPendingTimersAsync() expect(setTimeout).toHaveBeenCalledTimes(2) expect(spy).toHaveBeenCalledTimes(1) @@ -291,6 +293,46 @@ moduleIntegrationTestRunner({ "Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler." ) }) + + it("should fetch an idempotent workflow after its completion", async () => { + const { transaction: firstRun } = await workflowOrcModule.run( + "workflow_idempotent", + { + input: { + value: "123", + }, + throwOnError: true, + transactionId: "transaction_1", + } + ) + + let executionsList = await query({ + workflow_executions: { + fields: ["id"], + }, + }) + + const { transaction: secondRun } = await workflowOrcModule.run( + "workflow_idempotent", + { + input: { + value: "123", + }, + throwOnError: true, + transactionId: "transaction_1", + } + ) + + const executionsListAfter = await query({ + workflow_executions: { + fields: ["id"], + }, + }) + + expect(secondRun.flow.startedAt).toEqual(firstRun.flow.startedAt) + expect(executionsList).toHaveLength(1) + expect(executionsListAfter).toHaveLength(1) + }) }) }) }, diff --git a/packages/modules/workflow-engine-inmemory/src/index.ts b/packages/modules/workflow-engine-inmemory/src/index.ts index 7aa317c3f4..acc259f684 100644 --- a/packages/modules/workflow-engine-inmemory/src/index.ts +++ b/packages/modules/workflow-engine-inmemory/src/index.ts @@ -3,3 +3,4 @@ import { moduleDefinition } from "./module-definition" export default moduleDefinition export * from "./loaders" +export * from "./models" diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 8d21beea33..e0b300dd5e 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -1,26 +1,23 @@ import { Context, DAL, - FindConfig, InternalModuleDeclaration, - IWorkflowEngineService, ModuleJoinerConfig, ModulesSdkTypes, WorkflowsSdkTypes, } from "@medusajs/types" import { - InjectManager, InjectSharedContext, - isString, MedusaContext, - MedusaError, + ModulesSdkUtils, } from "@medusajs/utils" import type { ReturnWorkflow, UnwrapWorkflowInputDataType, } from "@medusajs/workflows-sdk" +import { WorkflowExecution } from "@models" import { WorkflowOrchestratorService } from "@services" -import { joinerConfig } from "../joiner-config" +import { entityNameToLinkableKeysMap, joinerConfig } from "../joiner-config" type InjectedDependencies = { baseRepository: DAL.RepositoryService @@ -28,9 +25,13 @@ type InjectedDependencies = { workflowOrchestratorService: WorkflowOrchestratorService } -export class WorkflowsModuleService implements IWorkflowEngineService { +export class WorkflowsModuleService< + TWorkflowExecution extends WorkflowExecution = WorkflowExecution +> extends ModulesSdkUtils.MedusaService<{ + WorkflowExecution: { dto: WorkflowExecution } +}>({ WorkflowExecution }, entityNameToLinkableKeysMap) { protected baseRepository_: DAL.RepositoryService - protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService + protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService protected workflowOrchestratorService_: WorkflowOrchestratorService constructor( @@ -41,6 +42,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService { }: InjectedDependencies, protected readonly moduleDeclaration: InternalModuleDeclaration ) { + // @ts-ignore + super(...arguments) + this.baseRepository_ = baseRepository this.workflowExecutionService_ = workflowExecutionService this.workflowOrchestratorService_ = workflowOrchestratorService @@ -50,122 +54,6 @@ export class WorkflowsModuleService implements IWorkflowEngineService { return joinerConfig } - @InjectManager("baseRepository_") - async retrieveWorkflowExecution( - idOrObject: - | string - | { - workflow_id: string - transaction_id: string - }, - config: FindConfig = {}, - @MedusaContext() sharedContext: Context = {} - ): Promise { - const objValue = isString(idOrObject) - ? { id: idOrObject } - : { - workflow_id: idOrObject.workflow_id, - transaction_id: idOrObject.transaction_id, - } - - const wfExecution = await this.workflowExecutionService_.list( - objValue, - config, - sharedContext - ) - - if (wfExecution.length === 0) { - throw new MedusaError( - MedusaError.Types.NOT_FOUND, - `WorkflowExecution with ${Object.keys(objValue).join( - ", " - )}: ${Object.values(objValue).join(", ")} was not found` - ) - } - - // eslint-disable-next-line max-len - return await this.baseRepository_.serialize( - wfExecution[0], - { - populate: true, - } - ) - } - - @InjectManager("baseRepository_") - async listWorkflowExecutions( - filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {}, - config: FindConfig = {}, - @MedusaContext() sharedContext: Context = {} - ): Promise { - if (filters.transaction_id) { - if (Array.isArray(filters.transaction_id)) { - filters.transaction_id = { - $in: filters.transaction_id, - } - } - } - - if (filters.workflow_id) { - if (Array.isArray(filters.workflow_id)) { - filters.workflow_id = { - $in: filters.workflow_id, - } - } - } - - const wfExecutions = await this.workflowExecutionService_.list( - filters, - config, - sharedContext - ) - - return await this.baseRepository_.serialize< - WorkflowsSdkTypes.WorkflowExecutionDTO[] - >(wfExecutions, { - populate: true, - }) - } - - @InjectManager("baseRepository_") - async listAndCountWorkflowExecutions( - filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {}, - config: FindConfig = {}, - @MedusaContext() sharedContext: Context = {} - ): Promise<[WorkflowsSdkTypes.WorkflowExecutionDTO[], number]> { - if (filters.transaction_id) { - if (Array.isArray(filters.transaction_id)) { - filters.transaction_id = { - $in: filters.transaction_id, - } - } - } - - if (filters.workflow_id) { - if (Array.isArray(filters.workflow_id)) { - filters.workflow_id = { - $in: filters.workflow_id, - } - } - } - - const [wfExecutions, count] = - await this.workflowExecutionService_.listAndCount( - filters, - config, - sharedContext - ) - - return [ - await this.baseRepository_.serialize< - WorkflowsSdkTypes.WorkflowExecutionDTO[] - >(wfExecutions, { - populate: true, - }), - count, - ] - } - @InjectSharedContext() async run>( workflowIdOrWorkflow: TWorkflow, diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index c59433e6ab..7e60cd15f0 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -1,22 +1,22 @@ import { DistributedTransaction, - DistributedTransactionStorage, IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, TransactionCheckpoint, + TransactionOptions, TransactionStep, } from "@medusajs/orchestration" -import { ModulesSdkTypes } from "@medusajs/types" +import { Logger, ModulesSdkTypes } from "@medusajs/types" import { MedusaError, TransactionState } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" import { CronExpression, parseExpression } from "cron-parser" -// eslint-disable-next-line max-len export class InMemoryDistributedTransactionStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage { private workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService + private logger_: Logger private workflowOrchestratorService_: WorkflowOrchestratorService private storage: Map = new Map() @@ -34,10 +34,13 @@ export class InMemoryDistributedTransactionStorage constructor({ workflowExecutionService, + logger, }: { workflowExecutionService: ModulesSdkTypes.IMedusaInternalService + logger: Logger }) { this.workflowExecutionService_ = workflowExecutionService + this.logger_ = logger } setWorkflowOrchestratorService(workflowOrchestratorService) { @@ -68,24 +71,43 @@ export class InMemoryDistributedTransactionStorage ]) } - /*private stringifyWithSymbol(key, value) { - if (key === "__type" && typeof value === "symbol") { - return Symbol.keyFor(value) + async get( + key: string, + options?: TransactionOptions + ): Promise { + const data = this.storage.get(key) + + if (data) { + return data } - return value - } - - private jsonWithSymbol(key, value) { - if (key === "__type" && typeof value === "string") { - return Symbol.for(value) + const { idempotent } = options ?? {} + if (!idempotent) { + return } - return value - }*/ + const [_, workflowId, transactionId] = key.split(":") + const trx = await this.workflowExecutionService_ + .retrieve( + { + workflow_id: workflowId, + transaction_id: transactionId, + }, + { + select: ["execution", "context"], + } + ) + .catch(() => undefined) - async get(key: string): Promise { - return this.storage.get(key) + if (trx) { + return { + flow: trx.execution, + context: trx.context.data, + errors: trx.context.errors, + } + } + + return } async list(): Promise { @@ -95,12 +117,11 @@ export class InMemoryDistributedTransactionStorage async save( key: string, data: TransactionCheckpoint, - ttl?: number + ttl?: number, + options?: TransactionOptions ): Promise { this.storage.set(key, data) - let retentionTime - /** * Store the retention time only if the transaction is done, failed or reverted. * From that moment, this tuple can be later on archived or deleted after the retention time. @@ -111,8 +132,9 @@ export class InMemoryDistributedTransactionStorage TransactionState.REVERTED, ].includes(data.flow.state) + const { retentionTime, idempotent } = options ?? {} + if (hasFinished) { - retentionTime = data.flow.options?.retentionTime Object.assign(data, { retention_time: retentionTime, }) @@ -121,7 +143,7 @@ export class InMemoryDistributedTransactionStorage const stringifiedData = JSON.stringify(data) const parsedData = JSON.parse(stringifiedData) - if (hasFinished && !retentionTime) { + if (hasFinished && !retentionTime && !idempotent) { await this.deleteFromDb(parsedData) } else { await this.saveToDb(parsedData) @@ -304,7 +326,7 @@ export class InMemoryDistributedTransactionStorage }) } catch (e) { if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) { - console.warn( + this.logger_?.warn( `Tried to execute a scheduled workflow with ID ${jobId} that does not exist, removing it from the scheduler.` ) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index da3a061615..61d14c65b0 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -4,8 +4,18 @@ import { TransactionTimeoutError, WorkflowManager, } from "@medusajs/orchestration" -import { IWorkflowEngineService, RemoteQueryFunction } from "@medusajs/types" -import { TransactionHandlerType, TransactionStepState } from "@medusajs/utils" +import { + IWorkflowEngineService, + MedusaContainer, + RemoteQueryFunction, +} from "@medusajs/types" +import { + ContainerRegistrationKeys, + TransactionHandlerType, + TransactionStepState, + createMedusaContainer, +} from "@medusajs/utils" +import { asValue } from "awilix" import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" @@ -28,42 +38,47 @@ const afterEach_ = async () => { } describe("Workflow Orchestrator module", function () { - describe("Testing basic workflow", function () { - let workflowOrcModule: IWorkflowEngineService - let query: RemoteQueryFunction + let workflowOrcModule: IWorkflowEngineService + let query: RemoteQueryFunction + let sharedContainer_: MedusaContainer - afterEach(afterEach_) + beforeAll(async () => { + const container = createMedusaContainer() + container.register(ContainerRegistrationKeys.LOGGER, asValue(console)) - beforeAll(async () => { - const { - runMigrations, - query: remoteQuery, - modules, - } = await MedusaApp({ - sharedResourcesConfig: { - database: { - connection: sharedPgConnection, - }, + const { + runMigrations, + query: remoteQuery, + modules, + sharedContainer, + } = await MedusaApp({ + sharedContainer: container, + sharedResourcesConfig: { + database: { + connection: sharedPgConnection, }, - modulesConfig: { - workflows: { - resolve: __dirname + "/../..", - options: { - redis: { - url: "localhost:6379", - }, + }, + modulesConfig: { + workflows: { + resolve: __dirname + "/../..", + options: { + redis: { + url: "localhost:6379", }, }, }, - }) - - query = remoteQuery - - await runMigrations() - - workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService + }, }) + query = remoteQuery + sharedContainer_ = sharedContainer! + + await runMigrations() + + workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService + }) + + describe("Testing basic workflow", function () { afterEach(afterEach_) it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => { @@ -320,10 +335,14 @@ describe("Workflow Orchestrator module", function () { }) it("should remove scheduled workflow if workflow no longer exists", async () => { + const logger = sharedContainer_.resolve( + ContainerRegistrationKeys.LOGGER + ) + const spy = await createScheduled("remove-scheduled", { cron: "* * * * * *", }) - const logSpy = jest.spyOn(console, "warn") + const logSpy = jest.spyOn(logger, "warn") await setTimeout(1100) expect(spy).toHaveBeenCalledTimes(1) diff --git a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts index 582baee15c..c15a1ebb9d 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts @@ -46,7 +46,7 @@ async function deleteKeysByPattern(pattern) { async function cleanRedis() { try { await deleteKeysByPattern("bull:*") - await deleteKeysByPattern("dtrans:*") + await deleteKeysByPattern("dtrx:*") } catch (error) { console.error("Error:", error) } diff --git a/packages/modules/workflow-engine-redis/src/index.ts b/packages/modules/workflow-engine-redis/src/index.ts index 7aa317c3f4..acc259f684 100644 --- a/packages/modules/workflow-engine-redis/src/index.ts +++ b/packages/modules/workflow-engine-redis/src/index.ts @@ -3,3 +3,4 @@ import { moduleDefinition } from "./module-definition" export default moduleDefinition export * from "./loaders" +export * from "./models" diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index ddd3d951be..708a13fb3f 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -1,26 +1,23 @@ import { Context, DAL, - FindConfig, InternalModuleDeclaration, - IWorkflowEngineService, ModuleJoinerConfig, ModulesSdkTypes, WorkflowsSdkTypes, } from "@medusajs/types" import { - InjectManager, InjectSharedContext, - isString, MedusaContext, - MedusaError, + ModulesSdkUtils, } from "@medusajs/utils" import type { ReturnWorkflow, UnwrapWorkflowInputDataType, } from "@medusajs/workflows-sdk" +import { WorkflowExecution } from "@models" import { WorkflowOrchestratorService } from "@services" -import { joinerConfig } from "../joiner-config" +import { entityNameToLinkableKeysMap, joinerConfig } from "../joiner-config" type InjectedDependencies = { baseRepository: DAL.RepositoryService @@ -29,9 +26,13 @@ type InjectedDependencies = { redisDisconnectHandler: () => Promise } -export class WorkflowsModuleService implements IWorkflowEngineService { +export class WorkflowsModuleService< + TWorkflowExecution extends WorkflowExecution = WorkflowExecution +> extends ModulesSdkUtils.MedusaService<{ + WorkflowExecution: { dto: WorkflowExecution } +}>({ WorkflowExecution }, entityNameToLinkableKeysMap) { protected baseRepository_: DAL.RepositoryService - protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService + protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService protected workflowOrchestratorService_: WorkflowOrchestratorService protected redisDisconnectHandler_: () => Promise @@ -44,6 +45,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService { }: InjectedDependencies, protected readonly moduleDeclaration: InternalModuleDeclaration ) { + // @ts-ignore + super(...arguments) + this.baseRepository_ = baseRepository this.workflowExecutionService_ = workflowExecutionService this.workflowOrchestratorService_ = workflowOrchestratorService @@ -64,122 +68,6 @@ export class WorkflowsModuleService implements IWorkflowEngineService { }, } - @InjectManager("baseRepository_") - async retrieveWorkflowExecution( - idOrObject: - | string - | { - workflow_id: string - transaction_id: string - }, - config: FindConfig = {}, - @MedusaContext() sharedContext: Context = {} - ): Promise { - const objValue = isString(idOrObject) - ? { id: idOrObject } - : { - workflow_id: idOrObject.workflow_id, - transaction_id: idOrObject.transaction_id, - } - - const wfExecution = await this.workflowExecutionService_.list( - objValue, - config, - sharedContext - ) - - if (wfExecution.length === 0) { - throw new MedusaError( - MedusaError.Types.NOT_FOUND, - `WorkflowExecution with ${Object.keys(objValue).join( - ", " - )}: ${Object.values(objValue).join(", ")} was not found` - ) - } - - // eslint-disable-next-line max-len - return await this.baseRepository_.serialize( - wfExecution[0], - { - populate: true, - } - ) - } - - @InjectManager("baseRepository_") - async listWorkflowExecutions( - filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {}, - config: FindConfig = {}, - @MedusaContext() sharedContext: Context = {} - ): Promise { - if (filters.transaction_id) { - if (Array.isArray(filters.transaction_id)) { - filters.transaction_id = { - $in: filters.transaction_id, - } - } - } - - if (filters.workflow_id) { - if (Array.isArray(filters.workflow_id)) { - filters.workflow_id = { - $in: filters.workflow_id, - } - } - } - - const wfExecutions = await this.workflowExecutionService_.list( - filters, - config, - sharedContext - ) - - return await this.baseRepository_.serialize< - WorkflowsSdkTypes.WorkflowExecutionDTO[] - >(wfExecutions, { - populate: true, - }) - } - - @InjectManager("baseRepository_") - async listAndCountWorkflowExecutions( - filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {}, - config: FindConfig = {}, - @MedusaContext() sharedContext: Context = {} - ): Promise<[WorkflowsSdkTypes.WorkflowExecutionDTO[], number]> { - if (filters.transaction_id) { - if (Array.isArray(filters.transaction_id)) { - filters.transaction_id = { - $in: filters.transaction_id, - } - } - } - - if (filters.workflow_id) { - if (Array.isArray(filters.workflow_id)) { - filters.workflow_id = { - $in: filters.workflow_id, - } - } - } - - const [wfExecutions, count] = - await this.workflowExecutionService_.listAndCount( - filters, - config, - sharedContext - ) - - return [ - await this.baseRepository_.serialize< - WorkflowsSdkTypes.WorkflowExecutionDTO[] - >(wfExecutions, { - populate: true, - }), - count, - ] - } - @InjectSharedContext() async run>( workflowIdOrWorkflow: TWorkflow, diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index c12bb20bc0..0a6b208e49 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -4,9 +4,10 @@ import { IDistributedTransactionStorage, SchedulerOptions, TransactionCheckpoint, + TransactionOptions, TransactionStep, } from "@medusajs/orchestration" -import { ModulesSdkTypes } from "@medusajs/types" +import { Logger, ModulesSdkTypes } from "@medusajs/types" import { MedusaError, TransactionState, promiseAll } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" import { Queue, Worker } from "bullmq" @@ -19,12 +20,12 @@ enum JobType { TRANSACTION_TIMEOUT = "transaction_timeout", } -// eslint-disable-next-line max-len export class RedisDistributedTransactionStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage { - private static TTL_AFTER_COMPLETED = 60 * 15 // 15 minutes + private static TTL_AFTER_COMPLETED = 60 * 2 // 2 minutes private workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService + private logger_: Logger private workflowOrchestratorService_: WorkflowOrchestratorService private redisClient: Redis @@ -36,13 +37,16 @@ export class RedisDistributedTransactionStorage redisConnection, redisWorkerConnection, redisQueueName, + logger, }: { workflowExecutionService: ModulesSdkTypes.IMedusaInternalService redisConnection: Redis redisWorkerConnection: Redis redisQueueName: string + logger: Logger }) { this.workflowExecutionService_ = workflowExecutionService + this.logger_ = logger this.redisClient = redisConnection @@ -131,7 +135,7 @@ export class RedisDistributedTransactionStorage }) } catch (e) { if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) { - console.warn( + this.logger_?.warn( `Tried to execute a scheduled workflow with ID ${jobId} that does not exist, removing it from the scheduler.` ) @@ -143,10 +147,42 @@ export class RedisDistributedTransactionStorage } } - async get(key: string): Promise { + async get( + key: string, + options?: TransactionOptions + ): Promise { const data = await this.redisClient.get(key) - return data ? JSON.parse(data) : undefined + if (data) { + return JSON.parse(data) + } + + const { idempotent } = options ?? {} + if (!idempotent) { + return + } + + const [_, workflowId, transactionId] = key.split(":") + const trx = await this.workflowExecutionService_ + .retrieve( + { + workflow_id: workflowId, + transaction_id: transactionId, + }, + { + select: ["execution", "context"], + } + ) + .catch(() => undefined) + + if (trx) { + return { + flow: trx.execution, + context: trx.context.data, + errors: trx.context.errors, + } + } + return } async list(): Promise { @@ -166,10 +202,9 @@ export class RedisDistributedTransactionStorage async save( key: string, data: TransactionCheckpoint, - ttl?: number + ttl?: number, + options?: TransactionOptions ): Promise { - let retentionTime - /** * Store the retention time only if the transaction is done, failed or reverted. * From that moment, this tuple can be later on archived or deleted after the retention time. @@ -180,8 +215,9 @@ export class RedisDistributedTransactionStorage TransactionState.REVERTED, ].includes(data.flow.state) + const { retentionTime, idempotent } = options ?? {} + if (hasFinished) { - retentionTime = data.flow.options?.retentionTime Object.assign(data, { retention_time: retentionTime, }) @@ -198,14 +234,13 @@ export class RedisDistributedTransactionStorage } } - if (hasFinished && !retentionTime) { + if (hasFinished && !retentionTime && !idempotent) { await this.deleteFromDb(parsedData) } else { await this.saveToDb(parsedData) } if (hasFinished) { - // await this.redisClient.del(key) await this.redisClient.set( key, stringifiedData,