diff --git a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index eee8c8b92c..9f2373aea0 100644 --- a/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -1,7 +1,7 @@ import { TransactionStepState, TransactionStepStatus } from "@medusajs/utils" import { setTimeout } from "timers/promises" import { - DistributedTransaction, + DistributedTransactionType, TransactionHandlerType, TransactionOrchestrator, TransactionPayload, @@ -970,7 +970,7 @@ describe("Transaction Orchestrator", () => { actionId: string, functionHandlerType: TransactionHandlerType, payload: TransactionPayload, - transaction?: DistributedTransaction + transaction?: DistributedTransactionType ) { transactionInHandler = transaction } diff --git a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts index 1649efa4e2..3bd37150c1 100644 --- a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts +++ b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts @@ -1,5 +1,5 @@ import { - DistributedTransaction, + DistributedTransactionType, TransactionCheckpoint, } from "../distributed-transaction" import { TransactionStep } from "../transaction-step" @@ -29,29 +29,31 @@ export interface IDistributedTransactionStorage { options?: TransactionOptions ): Promise scheduleRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number ): Promise clearRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise scheduleTransactionTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, timestamp: number, interval: number ): Promise scheduleStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number ): Promise - clearTransactionTimeout(transaction: DistributedTransaction): Promise + clearTransactionTimeout( + transaction: DistributedTransactionType + ): Promise clearStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise } @@ -103,7 +105,7 @@ export abstract class DistributedTransactionStorage } async scheduleRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number @@ -112,14 +114,14 @@ export abstract class DistributedTransactionStorage } async clearRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise { throw new Error("Method 'clearRetry' not implemented.") } async scheduleTransactionTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, timestamp: number, interval: number ): Promise { @@ -127,13 +129,13 @@ export abstract class DistributedTransactionStorage } async clearTransactionTimeout( - transaction: DistributedTransaction + transaction: DistributedTransactionType ): Promise { throw new Error("Method 'clearTransactionTimeout' not implemented.") } async scheduleStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number @@ -142,7 +144,7 @@ export abstract class DistributedTransactionStorage } async clearStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise { throw new Error("Method 'clearStepTimeout' not implemented.") diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index a38a84f67d..70a4850d50 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -74,7 +74,7 @@ export class TransactionPayload { * DistributedTransaction represents a distributed transaction, which is a transaction that is composed of multiple steps that are executed in a specific order. */ -export class DistributedTransaction extends EventEmitter { +class DistributedTransaction extends EventEmitter { public modelId: string public transactionId: string @@ -302,3 +302,12 @@ export class DistributedTransaction extends EventEmitter { DistributedTransaction.setStorage( new BaseInMemoryDistributedTransactionStorage() ) + +global.DistributedTransaction ??= DistributedTransaction +const GlobalDistributedTransaction = + global.DistributedTransaction as typeof DistributedTransaction + +export { + GlobalDistributedTransaction as DistributedTransaction, + DistributedTransaction as DistributedTransactionType, +} diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 2e80d440c2..63872a84d9 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -1,5 +1,6 @@ import { DistributedTransaction, + DistributedTransactionType, TransactionCheckpoint, TransactionPayload, } from "./distributed-transaction" @@ -173,7 +174,7 @@ export class TransactionOrchestrator extends EventEmitter { transaction, step, }: { - transaction?: DistributedTransaction + transaction?: DistributedTransactionType step?: TransactionStep }, dateNow: number @@ -195,7 +196,7 @@ export class TransactionOrchestrator extends EventEmitter { } private async checkTransactionTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, currentSteps: TransactionStep[] ) { const flow = transaction.getFlow() @@ -224,7 +225,7 @@ export class TransactionOrchestrator extends EventEmitter { } private async checkStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ) { let hasTimedOut = false @@ -249,7 +250,9 @@ export class TransactionOrchestrator extends EventEmitter { return hasTimedOut } - private async checkAllSteps(transaction: DistributedTransaction): Promise<{ + private async checkAllSteps( + transaction: DistributedTransactionType + ): Promise<{ current: TransactionStep[] next: TransactionStep[] total: number @@ -413,7 +416,7 @@ export class TransactionOrchestrator extends EventEmitter { } private static async setStepSuccess( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, response: unknown ): Promise { @@ -464,7 +467,7 @@ export class TransactionOrchestrator extends EventEmitter { } private static async skipStep( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise { const hasStepTimedOut = @@ -497,7 +500,7 @@ export class TransactionOrchestrator extends EventEmitter { } private static async setStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, error: TransactionStepTimeoutError | TransactionTimeoutError ): Promise { @@ -532,7 +535,7 @@ export class TransactionOrchestrator extends EventEmitter { } private static async setStepFailure( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, error: Error | any, maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES, @@ -611,7 +614,7 @@ export class TransactionOrchestrator extends EventEmitter { } private async executeNext( - transaction: DistributedTransaction + transaction: DistributedTransactionType ): Promise { let continueExecution = true @@ -854,7 +857,7 @@ export class TransactionOrchestrator extends EventEmitter { * Start a new transaction or resume a transaction that has been previously started * @param transaction - The transaction to resume */ - public async resume(transaction: DistributedTransaction): Promise { + public async resume(transaction: DistributedTransactionType): Promise { if (transaction.modelId !== this.id) { throw new MedusaError( MedusaError.Types.NOT_ALLOWED, @@ -895,7 +898,7 @@ export class TransactionOrchestrator extends EventEmitter { * @param transaction - The transaction to be reverted */ public async cancelTransaction( - transaction: DistributedTransaction + transaction: DistributedTransactionType ): Promise { if (transaction.modelId !== this.id) { throw new MedusaError( @@ -1106,7 +1109,7 @@ export class TransactionOrchestrator extends EventEmitter { handler: TransactionStepHandler, payload?: unknown, flowMetadata?: TransactionFlow["metadata"] - ): Promise { + ): Promise { const existingTransaction = await TransactionOrchestrator.loadTransactionById(this.id, transactionId) @@ -1147,7 +1150,7 @@ export class TransactionOrchestrator extends EventEmitter { public async retrieveExistingTransaction( transactionId: string, handler: TransactionStepHandler - ): Promise { + ): Promise { const existingTransaction = await TransactionOrchestrator.loadTransactionById(this.id, transactionId) @@ -1184,8 +1187,8 @@ export class TransactionOrchestrator extends EventEmitter { private static async getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey: string, handler?: TransactionStepHandler, - transaction?: DistributedTransaction - ): Promise<[DistributedTransaction, TransactionStep]> { + transaction?: DistributedTransactionType + ): Promise<[DistributedTransactionType, TransactionStep]> { const [modelId, transactionId, action, actionType] = responseIdempotencyKey.split(TransactionOrchestrator.SEPARATOR) @@ -1243,8 +1246,8 @@ export class TransactionOrchestrator extends EventEmitter { public async skipStep( responseIdempotencyKey: string, handler?: TransactionStepHandler, - transaction?: DistributedTransaction - ): Promise { + transaction?: DistributedTransactionType + ): Promise { const [curTransaction, step] = await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, @@ -1279,9 +1282,9 @@ export class TransactionOrchestrator extends EventEmitter { public async registerStepSuccess( responseIdempotencyKey: string, handler?: TransactionStepHandler, - transaction?: DistributedTransaction, + transaction?: DistributedTransactionType, response?: unknown - ): Promise { + ): Promise { const [curTransaction, step] = await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, @@ -1323,8 +1326,8 @@ export class TransactionOrchestrator extends EventEmitter { responseIdempotencyKey: string, error?: Error | any, handler?: TransactionStepHandler, - transaction?: DistributedTransaction - ): Promise { + transaction?: DistributedTransactionType + ): Promise { const [curTransaction, step] = await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey( responseIdempotencyKey, diff --git a/packages/core/orchestration/src/transaction/transaction-step.ts b/packages/core/orchestration/src/transaction/transaction-step.ts index abc0437000..e1157abf90 100644 --- a/packages/core/orchestration/src/transaction/transaction-step.ts +++ b/packages/core/orchestration/src/transaction/transaction-step.ts @@ -1,6 +1,6 @@ import { MedusaError, TransactionStepState } from "@medusajs/utils" import { - DistributedTransaction, + DistributedTransactionType, TransactionPayload, } from "./distributed-transaction" import { TransactionOrchestrator } from "./transaction-orchestrator" @@ -15,7 +15,7 @@ export type TransactionStepHandler = ( actionId: string, handlerType: TransactionHandlerType, payload: TransactionPayload, - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, orchestrator: TransactionOrchestrator ) => Promise diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index fee68169d4..7f86ce5557 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -1,4 +1,4 @@ -import { DistributedTransaction } from "./distributed-transaction" +import { DistributedTransactionType } from "./distributed-transaction" import { TransactionStep } from "./transaction-step" export { TransactionHandlerType, @@ -172,50 +172,52 @@ export enum DistributedTransactionEvent { } export type DistributedTransactionEvents = { - onBegin?: (args: { transaction: DistributedTransaction }) => void - onResume?: (args: { transaction: DistributedTransaction }) => void + onBegin?: (args: { transaction: DistributedTransactionType }) => void + onResume?: (args: { transaction: DistributedTransactionType }) => void onFinish?: (args: { - transaction: DistributedTransaction + transaction: DistributedTransactionType result?: unknown errors?: unknown[] }) => void - onTimeout?: (args: { transaction: DistributedTransaction }) => void + onTimeout?: (args: { transaction: DistributedTransactionType }) => void onStepBegin?: (args: { step: TransactionStep - transaction: DistributedTransaction + transaction: DistributedTransactionType }) => void onStepSuccess?: (args: { step: TransactionStep - transaction: DistributedTransaction + transaction: DistributedTransactionType }) => void onStepFailure?: (args: { step: TransactionStep - transaction: DistributedTransaction + transaction: DistributedTransactionType }) => void onStepAwaiting?: (args: { step: TransactionStep - transaction: DistributedTransaction + transaction: DistributedTransactionType }) => void - onCompensateBegin?: (args: { transaction: DistributedTransaction }) => void + onCompensateBegin?: (args: { + transaction: DistributedTransactionType + }) => void onCompensateStepSuccess?: (args: { step: TransactionStep - transaction: DistributedTransaction + transaction: DistributedTransactionType }) => void onCompensateStepFailure?: (args: { step: TransactionStep - transaction: DistributedTransaction + transaction: DistributedTransactionType }) => void onStepSkipped?: (args: { step: TransactionStep - transaction: DistributedTransaction + transaction: DistributedTransactionType }) => void } diff --git a/packages/core/orchestration/src/workflow/global-workflow.ts b/packages/core/orchestration/src/workflow/global-workflow.ts index 488579a1d4..b97baca5c6 100644 --- a/packages/core/orchestration/src/workflow/global-workflow.ts +++ b/packages/core/orchestration/src/workflow/global-workflow.ts @@ -3,7 +3,7 @@ import { createMedusaContainer } from "@medusajs/utils" import { asValue } from "awilix" import { - DistributedTransaction, + DistributedTransactionType, DistributedTransactionEvents, } from "../transaction" import { WorkflowDefinition, WorkflowManager } from "./workflow-manager" @@ -83,7 +83,7 @@ export class GlobalWorkflow extends WorkflowManager { workflowId: string, idempotencyKey: string, response?: unknown - ): Promise { + ): Promise { if (!WorkflowManager.workflows.has(workflowId)) { throw new Error(`Workflow with id "${workflowId}" not found.`) } @@ -116,7 +116,7 @@ export class GlobalWorkflow extends WorkflowManager { workflowId: string, idempotencyKey: string, error?: Error | any - ): Promise { + ): Promise { if (!WorkflowManager.workflows.has(workflowId)) { throw new Error(`Workflow with id "${workflowId}" not found.`) } diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index e72157d73a..2ffc0deafd 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -10,7 +10,7 @@ import { } from "@medusajs/utils" import { asValue } from "awilix" import { - DistributedTransaction, + DistributedTransactionType, DistributedTransactionEvent, DistributedTransactionEvents, TransactionFlow, @@ -167,7 +167,7 @@ export class LocalWorkflow { idempotencyKey, }: { orchestrator: TransactionOrchestrator - transaction?: DistributedTransaction + transaction?: DistributedTransactionType subscribe?: DistributedTransactionEvents idempotencyKey?: string }) { @@ -382,7 +382,7 @@ export class LocalWorkflow { } async cancel( - transactionOrTransactionId: string | DistributedTransaction, + transactionOrTransactionId: string | DistributedTransactionType, context?: Context, subscribe?: DistributedTransactionEvents ) { @@ -411,7 +411,7 @@ export class LocalWorkflow { response?: unknown, context?: Context, subscribe?: DistributedTransactionEvents - ): Promise { + ): Promise { this.medusaContext = context const { handler, orchestrator } = this.workflow @@ -438,7 +438,7 @@ export class LocalWorkflow { error?: Error | any, context?: Context, subscribe?: DistributedTransactionEvents - ): Promise { + ): Promise { this.medusaContext = context const { handler, orchestrator } = this.workflow diff --git a/packages/core/orchestration/src/workflow/scheduler.ts b/packages/core/orchestration/src/workflow/scheduler.ts index 89eaa20172..5e2b07621e 100644 --- a/packages/core/orchestration/src/workflow/scheduler.ts +++ b/packages/core/orchestration/src/workflow/scheduler.ts @@ -2,7 +2,7 @@ import { MedusaError } from "@medusajs/utils" import { IDistributedSchedulerStorage, SchedulerOptions } from "../transaction" import { WorkflowDefinition } from "./workflow-manager" -export class WorkflowScheduler { +class WorkflowScheduler { private static storage: IDistributedSchedulerStorage public static setStorage(storage: IDistributedSchedulerStorage) { this.storage = storage @@ -40,3 +40,9 @@ export class WorkflowScheduler { await WorkflowScheduler.storage.removeAll() } } + +global.WorkflowScheduler ??= WorkflowScheduler +const GlobalWorkflowScheduler = + global.WorkflowScheduler as typeof WorkflowScheduler + +export { GlobalWorkflowScheduler as WorkflowScheduler } diff --git a/packages/core/orchestration/src/workflow/workflow-manager.ts b/packages/core/orchestration/src/workflow/workflow-manager.ts index 2e56839dcf..c2f6a40d93 100644 --- a/packages/core/orchestration/src/workflow/workflow-manager.ts +++ b/packages/core/orchestration/src/workflow/workflow-manager.ts @@ -1,6 +1,6 @@ import { Context, MedusaContainer } from "@medusajs/types" import { - DistributedTransaction, + DistributedTransactionType, OrchestratorBuilder, TransactionHandlerType, TransactionMetadata, @@ -40,7 +40,7 @@ export type WorkflowStepHandlerArguments = { invoke: { [actions: string]: unknown } compensate: { [actions: string]: unknown } metadata: TransactionMetadata - transaction: DistributedTransaction + transaction: DistributedTransactionType step: TransactionStep orchestrator: TransactionOrchestrator context?: Context @@ -194,7 +194,7 @@ class WorkflowManager { actionId: string, handlerType: TransactionHandlerType, payload: any, - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, orchestrator: TransactionOrchestrator ) => { @@ -217,7 +217,7 @@ class WorkflowManager { invoke, compensate, metadata, - transaction: transaction as DistributedTransaction, + transaction: transaction as DistributedTransactionType, step, orchestrator, context, diff --git a/packages/core/workflows-sdk/src/helper/type.ts b/packages/core/workflows-sdk/src/helper/type.ts index 2c77c40425..74521d1ad2 100644 --- a/packages/core/workflows-sdk/src/helper/type.ts +++ b/packages/core/workflows-sdk/src/helper/type.ts @@ -1,6 +1,6 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { - DistributedTransaction, + DistributedTransactionType, DistributedTransactionEvents, LocalWorkflow, TransactionStepError, @@ -36,7 +36,7 @@ export type FlowRegisterStepFailureOptions = { } export type FlowCancelOptions = { - transaction?: DistributedTransaction + transaction?: DistributedTransactionType transactionId?: string context?: Context throwOnError?: boolean @@ -55,7 +55,7 @@ export type WorkflowResult = { /** * The transaction details of the workflow's execution. */ - transaction: DistributedTransaction + transaction: DistributedTransactionType /** * The result returned by the workflow. */ diff --git a/packages/core/workflows-sdk/src/helper/workflow-export.ts b/packages/core/workflows-sdk/src/helper/workflow-export.ts index a9ad772c6b..f260e1b8e5 100644 --- a/packages/core/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/core/workflows-sdk/src/helper/workflow-export.ts @@ -1,6 +1,6 @@ import { MedusaModule } from "@medusajs/modules-sdk" import { - DistributedTransaction, + DistributedTransactionType, DistributedTransactionEvents, LocalWorkflow, TransactionHandlerType, @@ -66,7 +66,7 @@ function createContextualWorkflowRunner< isCancel = false, container: executionContainer, }, - transactionOrIdOrIdempotencyKey: DistributedTransaction | string, + transactionOrIdOrIdempotencyKey: DistributedTransactionType | string, input: unknown, context: Context, events: DistributedTransactionEvents | undefined = {} @@ -483,7 +483,7 @@ function attachOnFinishReleaseEvents( const onFinish = events.onFinish const wrappedOnFinish = async (args: { - transaction: DistributedTransaction + transaction: DistributedTransactionType result?: unknown errors?: unknown[] }) => { diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 0ea912032f..b0713fcfad 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -1,5 +1,6 @@ import { DistributedTransaction, + DistributedTransactionType, DistributedTransactionEvents, TransactionHandlerType, TransactionStep, @@ -178,7 +179,7 @@ export class WorkflowOrchestratorService { transactionId: string, options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} - ): Promise { + ): Promise { let { context, container } = options ?? {} if (!workflowId) { diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 7e60cd15f0..a02a9f44bc 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -1,5 +1,5 @@ import { - DistributedTransaction, + DistributedTransactionType, IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, @@ -155,7 +155,7 @@ export class InMemoryDistributedTransactionStorage } async scheduleRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number @@ -174,7 +174,7 @@ export class InMemoryDistributedTransactionStorage } async clearRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise { const { modelId: workflowId, transactionId } = transaction @@ -188,7 +188,7 @@ export class InMemoryDistributedTransactionStorage } async scheduleTransactionTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, timestamp: number, interval: number ): Promise { @@ -206,7 +206,7 @@ export class InMemoryDistributedTransactionStorage } async clearTransactionTimeout( - transaction: DistributedTransaction + transaction: DistributedTransactionType ): Promise { const { modelId: workflowId, transactionId } = transaction @@ -219,7 +219,7 @@ export class InMemoryDistributedTransactionStorage } async scheduleStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number @@ -238,7 +238,7 @@ export class InMemoryDistributedTransactionStorage } async clearStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise { const { modelId: workflowId, transactionId } = transaction diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index c138b9667b..2d2ee517d3 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -1,5 +1,6 @@ import { DistributedTransaction, + DistributedTransactionType, DistributedTransactionEvents, TransactionHandlerType, TransactionStep, @@ -221,7 +222,7 @@ export class WorkflowOrchestratorService { transactionId: string, options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} - ): Promise { + ): Promise { let { context, container } = options ?? {} if (!workflowId) { diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 9aff012499..a3a5cf6a87 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -1,5 +1,6 @@ import { DistributedTransaction, + DistributedTransactionType, IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, @@ -256,7 +257,7 @@ export class RedisDistributedTransactionStorage } async scheduleRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number @@ -277,14 +278,14 @@ export class RedisDistributedTransactionStorage } async clearRetry( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise { await this.removeJob(JobType.RETRY, transaction, step) } async scheduleTransactionTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, timestamp: number, interval: number ): Promise { @@ -303,13 +304,13 @@ export class RedisDistributedTransactionStorage } async clearTransactionTimeout( - transaction: DistributedTransaction + transaction: DistributedTransactionType ): Promise { await this.removeJob(JobType.TRANSACTION_TIMEOUT, transaction) } async scheduleStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep, timestamp: number, interval: number @@ -330,7 +331,7 @@ export class RedisDistributedTransactionStorage } async clearStepTimeout( - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step: TransactionStep ): Promise { await this.removeJob(JobType.STEP_TIMEOUT, transaction, step) @@ -338,7 +339,7 @@ export class RedisDistributedTransactionStorage private getJobId( type: JobType, - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step?: TransactionStep ) { const key = [type, transaction.modelId, transaction.transactionId] @@ -355,7 +356,7 @@ export class RedisDistributedTransactionStorage private async removeJob( type: JobType, - transaction: DistributedTransaction, + transaction: DistributedTransactionType, step?: TransactionStep ) { const jobId = this.getJobId(type, transaction, step) @@ -387,6 +388,14 @@ export class RedisDistributedTransactionStorage limit: schedulerOptions.numberOfExecutions, key: `${JobType.SCHEDULE}_${jobId}`, }, + removeOnComplete: { + age: 86400, + count: 1000, + }, + removeOnFail: { + age: 604800, + count: 5000, + }, } ) }