fix: Use a global reference to distributed transaction and scheduler (#8462)

* fix: Add storage cleanup on scheduled jobs

* fix: Use a global reference to distributed transaction and scheduler classes
This commit is contained in:
Stevche Radevski
2024-08-06 14:14:51 +02:00
committed by GitHub
parent c870302400
commit 4155d0354f
16 changed files with 121 additions and 88 deletions

View File

@@ -1,7 +1,7 @@
import { TransactionStepState, TransactionStepStatus } from "@medusajs/utils"
import { setTimeout } from "timers/promises"
import {
DistributedTransaction,
DistributedTransactionType,
TransactionHandlerType,
TransactionOrchestrator,
TransactionPayload,
@@ -970,7 +970,7 @@ describe("Transaction Orchestrator", () => {
actionId: string,
functionHandlerType: TransactionHandlerType,
payload: TransactionPayload,
transaction?: DistributedTransaction
transaction?: DistributedTransactionType
) {
transactionInHandler = transaction
}

View File

@@ -1,5 +1,5 @@
import {
DistributedTransaction,
DistributedTransactionType,
TransactionCheckpoint,
} from "../distributed-transaction"
import { TransactionStep } from "../transaction-step"
@@ -29,29 +29,31 @@ export interface IDistributedTransactionStorage {
options?: TransactionOptions
): Promise<void>
scheduleRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void>
clearRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void>
scheduleTransactionTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
timestamp: number,
interval: number
): Promise<void>
scheduleStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void>
clearTransactionTimeout(transaction: DistributedTransaction): Promise<void>
clearTransactionTimeout(
transaction: DistributedTransactionType
): Promise<void>
clearStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void>
}
@@ -103,7 +105,7 @@ export abstract class DistributedTransactionStorage
}
async scheduleRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
@@ -112,14 +114,14 @@ export abstract class DistributedTransactionStorage
}
async clearRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
throw new Error("Method 'clearRetry' not implemented.")
}
async scheduleTransactionTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
timestamp: number,
interval: number
): Promise<void> {
@@ -127,13 +129,13 @@ export abstract class DistributedTransactionStorage
}
async clearTransactionTimeout(
transaction: DistributedTransaction
transaction: DistributedTransactionType
): Promise<void> {
throw new Error("Method 'clearTransactionTimeout' not implemented.")
}
async scheduleStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
@@ -142,7 +144,7 @@ export abstract class DistributedTransactionStorage
}
async clearStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
throw new Error("Method 'clearStepTimeout' not implemented.")

View File

@@ -74,7 +74,7 @@ export class TransactionPayload {
* DistributedTransaction represents a distributed transaction, which is a transaction that is composed of multiple steps that are executed in a specific order.
*/
export class DistributedTransaction extends EventEmitter {
class DistributedTransaction extends EventEmitter {
public modelId: string
public transactionId: string
@@ -302,3 +302,12 @@ export class DistributedTransaction extends EventEmitter {
DistributedTransaction.setStorage(
new BaseInMemoryDistributedTransactionStorage()
)
global.DistributedTransaction ??= DistributedTransaction
const GlobalDistributedTransaction =
global.DistributedTransaction as typeof DistributedTransaction
export {
GlobalDistributedTransaction as DistributedTransaction,
DistributedTransaction as DistributedTransactionType,
}

View File

@@ -1,5 +1,6 @@
import {
DistributedTransaction,
DistributedTransactionType,
TransactionCheckpoint,
TransactionPayload,
} from "./distributed-transaction"
@@ -173,7 +174,7 @@ export class TransactionOrchestrator extends EventEmitter {
transaction,
step,
}: {
transaction?: DistributedTransaction
transaction?: DistributedTransactionType
step?: TransactionStep
},
dateNow: number
@@ -195,7 +196,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
private async checkTransactionTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
currentSteps: TransactionStep[]
) {
const flow = transaction.getFlow()
@@ -224,7 +225,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
private async checkStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
) {
let hasTimedOut = false
@@ -249,7 +250,9 @@ export class TransactionOrchestrator extends EventEmitter {
return hasTimedOut
}
private async checkAllSteps(transaction: DistributedTransaction): Promise<{
private async checkAllSteps(
transaction: DistributedTransactionType
): Promise<{
current: TransactionStep[]
next: TransactionStep[]
total: number
@@ -413,7 +416,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
private static async setStepSuccess(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
response: unknown
): Promise<void> {
@@ -464,7 +467,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
private static async skipStep(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
const hasStepTimedOut =
@@ -497,7 +500,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
private static async setStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
error: TransactionStepTimeoutError | TransactionTimeoutError
): Promise<void> {
@@ -532,7 +535,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
private static async setStepFailure(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
error: Error | any,
maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES,
@@ -611,7 +614,7 @@ export class TransactionOrchestrator extends EventEmitter {
}
private async executeNext(
transaction: DistributedTransaction
transaction: DistributedTransactionType
): Promise<void> {
let continueExecution = true
@@ -854,7 +857,7 @@ export class TransactionOrchestrator extends EventEmitter {
* Start a new transaction or resume a transaction that has been previously started
* @param transaction - The transaction to resume
*/
public async resume(transaction: DistributedTransaction): Promise<void> {
public async resume(transaction: DistributedTransactionType): Promise<void> {
if (transaction.modelId !== this.id) {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
@@ -895,7 +898,7 @@ export class TransactionOrchestrator extends EventEmitter {
* @param transaction - The transaction to be reverted
*/
public async cancelTransaction(
transaction: DistributedTransaction
transaction: DistributedTransactionType
): Promise<void> {
if (transaction.modelId !== this.id) {
throw new MedusaError(
@@ -1106,7 +1109,7 @@ export class TransactionOrchestrator extends EventEmitter {
handler: TransactionStepHandler,
payload?: unknown,
flowMetadata?: TransactionFlow["metadata"]
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(this.id, transactionId)
@@ -1147,7 +1150,7 @@ export class TransactionOrchestrator extends EventEmitter {
public async retrieveExistingTransaction(
transactionId: string,
handler: TransactionStepHandler
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(this.id, transactionId)
@@ -1184,8 +1187,8 @@ export class TransactionOrchestrator extends EventEmitter {
private static async getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<[DistributedTransaction, TransactionStep]> {
transaction?: DistributedTransactionType
): Promise<[DistributedTransactionType, TransactionStep]> {
const [modelId, transactionId, action, actionType] =
responseIdempotencyKey.split(TransactionOrchestrator.SEPARATOR)
@@ -1243,8 +1246,8 @@ export class TransactionOrchestrator extends EventEmitter {
public async skipStep(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<DistributedTransaction> {
transaction?: DistributedTransactionType
): Promise<DistributedTransactionType> {
const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
@@ -1279,9 +1282,9 @@ export class TransactionOrchestrator extends EventEmitter {
public async registerStepSuccess(
responseIdempotencyKey: string,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction,
transaction?: DistributedTransactionType,
response?: unknown
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,
@@ -1323,8 +1326,8 @@ export class TransactionOrchestrator extends EventEmitter {
responseIdempotencyKey: string,
error?: Error | any,
handler?: TransactionStepHandler,
transaction?: DistributedTransaction
): Promise<DistributedTransaction> {
transaction?: DistributedTransactionType
): Promise<DistributedTransactionType> {
const [curTransaction, step] =
await TransactionOrchestrator.getTransactionAndStepFromIdempotencyKey(
responseIdempotencyKey,

View File

@@ -1,6 +1,6 @@
import { MedusaError, TransactionStepState } from "@medusajs/utils"
import {
DistributedTransaction,
DistributedTransactionType,
TransactionPayload,
} from "./distributed-transaction"
import { TransactionOrchestrator } from "./transaction-orchestrator"
@@ -15,7 +15,7 @@ export type TransactionStepHandler = (
actionId: string,
handlerType: TransactionHandlerType,
payload: TransactionPayload,
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
orchestrator: TransactionOrchestrator
) => Promise<unknown>

View File

@@ -1,4 +1,4 @@
import { DistributedTransaction } from "./distributed-transaction"
import { DistributedTransactionType } from "./distributed-transaction"
import { TransactionStep } from "./transaction-step"
export {
TransactionHandlerType,
@@ -172,50 +172,52 @@ export enum DistributedTransactionEvent {
}
export type DistributedTransactionEvents = {
onBegin?: (args: { transaction: DistributedTransaction }) => void
onResume?: (args: { transaction: DistributedTransaction }) => void
onBegin?: (args: { transaction: DistributedTransactionType }) => void
onResume?: (args: { transaction: DistributedTransactionType }) => void
onFinish?: (args: {
transaction: DistributedTransaction
transaction: DistributedTransactionType
result?: unknown
errors?: unknown[]
}) => void
onTimeout?: (args: { transaction: DistributedTransaction }) => void
onTimeout?: (args: { transaction: DistributedTransactionType }) => void
onStepBegin?: (args: {
step: TransactionStep
transaction: DistributedTransaction
transaction: DistributedTransactionType
}) => void
onStepSuccess?: (args: {
step: TransactionStep
transaction: DistributedTransaction
transaction: DistributedTransactionType
}) => void
onStepFailure?: (args: {
step: TransactionStep
transaction: DistributedTransaction
transaction: DistributedTransactionType
}) => void
onStepAwaiting?: (args: {
step: TransactionStep
transaction: DistributedTransaction
transaction: DistributedTransactionType
}) => void
onCompensateBegin?: (args: { transaction: DistributedTransaction }) => void
onCompensateBegin?: (args: {
transaction: DistributedTransactionType
}) => void
onCompensateStepSuccess?: (args: {
step: TransactionStep
transaction: DistributedTransaction
transaction: DistributedTransactionType
}) => void
onCompensateStepFailure?: (args: {
step: TransactionStep
transaction: DistributedTransaction
transaction: DistributedTransactionType
}) => void
onStepSkipped?: (args: {
step: TransactionStep
transaction: DistributedTransaction
transaction: DistributedTransactionType
}) => void
}

View File

@@ -3,7 +3,7 @@ import { createMedusaContainer } from "@medusajs/utils"
import { asValue } from "awilix"
import {
DistributedTransaction,
DistributedTransactionType,
DistributedTransactionEvents,
} from "../transaction"
import { WorkflowDefinition, WorkflowManager } from "./workflow-manager"
@@ -83,7 +83,7 @@ export class GlobalWorkflow extends WorkflowManager {
workflowId: string,
idempotencyKey: string,
response?: unknown
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
if (!WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
@@ -116,7 +116,7 @@ export class GlobalWorkflow extends WorkflowManager {
workflowId: string,
idempotencyKey: string,
error?: Error | any
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
if (!WorkflowManager.workflows.has(workflowId)) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}

View File

@@ -10,7 +10,7 @@ import {
} from "@medusajs/utils"
import { asValue } from "awilix"
import {
DistributedTransaction,
DistributedTransactionType,
DistributedTransactionEvent,
DistributedTransactionEvents,
TransactionFlow,
@@ -167,7 +167,7 @@ export class LocalWorkflow {
idempotencyKey,
}: {
orchestrator: TransactionOrchestrator
transaction?: DistributedTransaction
transaction?: DistributedTransactionType
subscribe?: DistributedTransactionEvents
idempotencyKey?: string
}) {
@@ -382,7 +382,7 @@ export class LocalWorkflow {
}
async cancel(
transactionOrTransactionId: string | DistributedTransaction,
transactionOrTransactionId: string | DistributedTransactionType,
context?: Context,
subscribe?: DistributedTransactionEvents
) {
@@ -411,7 +411,7 @@ export class LocalWorkflow {
response?: unknown,
context?: Context,
subscribe?: DistributedTransactionEvents
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
this.medusaContext = context
const { handler, orchestrator } = this.workflow
@@ -438,7 +438,7 @@ export class LocalWorkflow {
error?: Error | any,
context?: Context,
subscribe?: DistributedTransactionEvents
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
this.medusaContext = context
const { handler, orchestrator } = this.workflow

View File

@@ -2,7 +2,7 @@ import { MedusaError } from "@medusajs/utils"
import { IDistributedSchedulerStorage, SchedulerOptions } from "../transaction"
import { WorkflowDefinition } from "./workflow-manager"
export class WorkflowScheduler {
class WorkflowScheduler {
private static storage: IDistributedSchedulerStorage
public static setStorage(storage: IDistributedSchedulerStorage) {
this.storage = storage
@@ -40,3 +40,9 @@ export class WorkflowScheduler {
await WorkflowScheduler.storage.removeAll()
}
}
global.WorkflowScheduler ??= WorkflowScheduler
const GlobalWorkflowScheduler =
global.WorkflowScheduler as typeof WorkflowScheduler
export { GlobalWorkflowScheduler as WorkflowScheduler }

View File

@@ -1,6 +1,6 @@
import { Context, MedusaContainer } from "@medusajs/types"
import {
DistributedTransaction,
DistributedTransactionType,
OrchestratorBuilder,
TransactionHandlerType,
TransactionMetadata,
@@ -40,7 +40,7 @@ export type WorkflowStepHandlerArguments = {
invoke: { [actions: string]: unknown }
compensate: { [actions: string]: unknown }
metadata: TransactionMetadata
transaction: DistributedTransaction
transaction: DistributedTransactionType
step: TransactionStep
orchestrator: TransactionOrchestrator
context?: Context
@@ -194,7 +194,7 @@ class WorkflowManager {
actionId: string,
handlerType: TransactionHandlerType,
payload: any,
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
orchestrator: TransactionOrchestrator
) => {
@@ -217,7 +217,7 @@ class WorkflowManager {
invoke,
compensate,
metadata,
transaction: transaction as DistributedTransaction,
transaction: transaction as DistributedTransactionType,
step,
orchestrator,
context,

View File

@@ -1,6 +1,6 @@
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import {
DistributedTransaction,
DistributedTransactionType,
DistributedTransactionEvents,
LocalWorkflow,
TransactionStepError,
@@ -36,7 +36,7 @@ export type FlowRegisterStepFailureOptions<TData = unknown> = {
}
export type FlowCancelOptions = {
transaction?: DistributedTransaction
transaction?: DistributedTransactionType
transactionId?: string
context?: Context
throwOnError?: boolean
@@ -55,7 +55,7 @@ export type WorkflowResult<TResult = unknown> = {
/**
* The transaction details of the workflow's execution.
*/
transaction: DistributedTransaction
transaction: DistributedTransactionType
/**
* The result returned by the workflow.
*/

View File

@@ -1,6 +1,6 @@
import { MedusaModule } from "@medusajs/modules-sdk"
import {
DistributedTransaction,
DistributedTransactionType,
DistributedTransactionEvents,
LocalWorkflow,
TransactionHandlerType,
@@ -66,7 +66,7 @@ function createContextualWorkflowRunner<
isCancel = false,
container: executionContainer,
},
transactionOrIdOrIdempotencyKey: DistributedTransaction | string,
transactionOrIdOrIdempotencyKey: DistributedTransactionType | string,
input: unknown,
context: Context,
events: DistributedTransactionEvents | undefined = {}
@@ -483,7 +483,7 @@ function attachOnFinishReleaseEvents(
const onFinish = events.onFinish
const wrappedOnFinish = async (args: {
transaction: DistributedTransaction
transaction: DistributedTransactionType
result?: unknown
errors?: unknown[]
}) => {

View File

@@ -1,5 +1,6 @@
import {
DistributedTransaction,
DistributedTransactionType,
DistributedTransactionEvents,
TransactionHandlerType,
TransactionStep,
@@ -178,7 +179,7 @@ export class WorkflowOrchestratorService {
transactionId: string,
options?: WorkflowOrchestratorRunOptions<undefined>,
@MedusaContext() sharedContext: Context = {}
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
let { context, container } = options ?? {}
if (!workflowId) {

View File

@@ -1,5 +1,5 @@
import {
DistributedTransaction,
DistributedTransactionType,
IDistributedSchedulerStorage,
IDistributedTransactionStorage,
SchedulerOptions,
@@ -155,7 +155,7 @@ export class InMemoryDistributedTransactionStorage
}
async scheduleRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
@@ -174,7 +174,7 @@ export class InMemoryDistributedTransactionStorage
}
async clearRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
const { modelId: workflowId, transactionId } = transaction
@@ -188,7 +188,7 @@ export class InMemoryDistributedTransactionStorage
}
async scheduleTransactionTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
timestamp: number,
interval: number
): Promise<void> {
@@ -206,7 +206,7 @@ export class InMemoryDistributedTransactionStorage
}
async clearTransactionTimeout(
transaction: DistributedTransaction
transaction: DistributedTransactionType
): Promise<void> {
const { modelId: workflowId, transactionId } = transaction
@@ -219,7 +219,7 @@ export class InMemoryDistributedTransactionStorage
}
async scheduleStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
@@ -238,7 +238,7 @@ export class InMemoryDistributedTransactionStorage
}
async clearStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
const { modelId: workflowId, transactionId } = transaction

View File

@@ -1,5 +1,6 @@
import {
DistributedTransaction,
DistributedTransactionType,
DistributedTransactionEvents,
TransactionHandlerType,
TransactionStep,
@@ -221,7 +222,7 @@ export class WorkflowOrchestratorService {
transactionId: string,
options?: WorkflowOrchestratorRunOptions<undefined>,
@MedusaContext() sharedContext: Context = {}
): Promise<DistributedTransaction> {
): Promise<DistributedTransactionType> {
let { context, container } = options ?? {}
if (!workflowId) {

View File

@@ -1,5 +1,6 @@
import {
DistributedTransaction,
DistributedTransactionType,
IDistributedSchedulerStorage,
IDistributedTransactionStorage,
SchedulerOptions,
@@ -256,7 +257,7 @@ export class RedisDistributedTransactionStorage
}
async scheduleRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
@@ -277,14 +278,14 @@ export class RedisDistributedTransactionStorage
}
async clearRetry(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
await this.removeJob(JobType.RETRY, transaction, step)
}
async scheduleTransactionTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
timestamp: number,
interval: number
): Promise<void> {
@@ -303,13 +304,13 @@ export class RedisDistributedTransactionStorage
}
async clearTransactionTimeout(
transaction: DistributedTransaction
transaction: DistributedTransactionType
): Promise<void> {
await this.removeJob(JobType.TRANSACTION_TIMEOUT, transaction)
}
async scheduleStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep,
timestamp: number,
interval: number
@@ -330,7 +331,7 @@ export class RedisDistributedTransactionStorage
}
async clearStepTimeout(
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
await this.removeJob(JobType.STEP_TIMEOUT, transaction, step)
@@ -338,7 +339,7 @@ export class RedisDistributedTransactionStorage
private getJobId(
type: JobType,
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step?: TransactionStep
) {
const key = [type, transaction.modelId, transaction.transactionId]
@@ -355,7 +356,7 @@ export class RedisDistributedTransactionStorage
private async removeJob(
type: JobType,
transaction: DistributedTransaction,
transaction: DistributedTransactionType,
step?: TransactionStep
) {
const jobId = this.getJobId(type, transaction, step)
@@ -387,6 +388,14 @@ export class RedisDistributedTransactionStorage
limit: schedulerOptions.numberOfExecutions,
key: `${JobType.SCHEDULE}_${jobId}`,
},
removeOnComplete: {
age: 86400,
count: 1000,
},
removeOnFail: {
age: 604800,
count: 5000,
},
}
)
}