chore(medusa, orchestration, utils, workflows-sdk): add transaction options support and cleanup (#6020)

* chore(medusa, orchestration, utils, workflows-sdk): add transaction options support and cleanup
This commit is contained in:
Adrien de Peretti
2024-01-08 14:07:12 +01:00
committed by GitHub
parent 0c858f7fd4
commit fbee006e51
9 changed files with 92 additions and 16 deletions

View File

@@ -0,0 +1,8 @@
---
"@medusajs/medusa": patch
"@medusajs/orchestration": patch
"@medusajs/utils": patch
"@medusajs/workflows-sdk": patch
---
chore(medusa, orchestration, utils, workflows-sdk): add transaction options support and cleanup

View File

@@ -45,7 +45,7 @@ async function loadLegacyModulesEntities(configModules, container) {
for (const [moduleName, moduleConfig] of Object.entries(configModules)) {
const definition = ModulesDefinition[moduleName]
if (!definition.isLegacy) {
if (!definition?.isLegacy) {
continue
}

View File

@@ -9,8 +9,8 @@ import {
TransactionHandlerType,
TransactionModelOptions,
TransactionState,
TransactionStepStatus,
TransactionStepsDefinition,
TransactionStepStatus,
} from "./types"
import { MedusaError, promiseAll } from "@medusajs/utils"
@@ -61,6 +61,11 @@ export class TransactionOrchestrator extends EventEmitter {
public static getKeyName(...params: string[]): string {
return params.join(this.SEPARATOR)
}
public getOptions(): TransactionModelOptions {
return this.options ?? {}
}
private getPreviousStep(flow: TransactionFlow, step: TransactionStep) {
const id = step.id.split(".")
id.pop()
@@ -465,7 +470,7 @@ export class TransactionOrchestrator extends EventEmitter {
if (flow.options?.retentionTime == undefined) {
await transaction.deleteCheckpoint()
} else {
await transaction.archiveCheckpoint()
await transaction.saveCheckpoint()
}
this.emit(DistributedTransactionEvent.FINISH, { transaction })

View File

@@ -5,6 +5,7 @@ import {
DistributedTransaction,
DistributedTransactionEvent,
DistributedTransactionEvents,
TransactionModelOptions,
TransactionOrchestrator,
TransactionStepsDefinition,
} from "../transaction"
@@ -24,6 +25,7 @@ export class LocalWorkflow {
protected container: MedusaContainer
protected workflowId: string
protected flow: OrchestratorBuilder
protected customOptions: Partial<TransactionModelOptions> = {}
protected workflow: WorkflowDefinition
protected handlers: Map<string, StepHandler>
@@ -64,10 +66,21 @@ export class LocalWorkflow {
protected commit() {
const finalFlow = this.flow.build()
const globalWorkflow = WorkflowManager.getWorkflow(this.workflowId)
const customOptions = {
...globalWorkflow?.options,
...this.customOptions,
}
this.workflow = {
id: this.workflowId,
flow_: finalFlow,
orchestrator: new TransactionOrchestrator(this.workflowId, finalFlow),
orchestrator: new TransactionOrchestrator(
this.workflowId,
finalFlow,
customOptions
),
options: customOptions,
handler: WorkflowManager.buildHandlers(this.handlers),
handlers_: this.handlers,
}
@@ -361,6 +374,11 @@ export class LocalWorkflow {
return transaction
}
setOptions(options: Partial<TransactionModelOptions>) {
this.customOptions = options
return this
}
addAction(
action: string,
handler: StepHandler,

View File

@@ -4,6 +4,7 @@ import {
OrchestratorBuilder,
TransactionHandlerType,
TransactionMetadata,
TransactionModelOptions,
TransactionOrchestrator,
TransactionStepHandler,
TransactionStepsDefinition,
@@ -21,6 +22,7 @@ export interface WorkflowDefinition {
string,
{ invoke: WorkflowStepHandler; compensate?: WorkflowStepHandler }
>
options: TransactionModelOptions
requiredModules?: Set<string>
optionalModules?: Set<string>
}
@@ -72,6 +74,7 @@ export class WorkflowManager {
workflowId: string,
flow: TransactionStepsDefinition | OrchestratorBuilder | undefined,
handlers: WorkflowHandler,
options: TransactionModelOptions = {},
requiredModules?: Set<string>,
optionalModules?: Set<string>
) {
@@ -93,9 +96,14 @@ export class WorkflowManager {
WorkflowManager.workflows.set(workflowId, {
id: workflowId,
flow_: finalFlow!,
orchestrator: new TransactionOrchestrator(workflowId, finalFlow ?? {}),
orchestrator: new TransactionOrchestrator(
workflowId,
finalFlow ?? {},
options
),
handler: WorkflowManager.buildHandlers(handlers),
handlers_: handlers,
options,
requiredModules,
optionalModules,
})
@@ -108,6 +116,7 @@ export class WorkflowManager {
string,
{ invoke: WorkflowStepHandler; compensate?: WorkflowStepHandler }
>,
options: TransactionModelOptions = {},
requiredModules?: Set<string>,
optionalModules?: Set<string>
) {
@@ -126,9 +135,10 @@ export class WorkflowManager {
WorkflowManager.workflows.set(workflowId, {
id: workflowId,
flow_: finalFlow,
orchestrator: new TransactionOrchestrator(workflowId, finalFlow),
orchestrator: new TransactionOrchestrator(workflowId, finalFlow, options),
handler: WorkflowManager.buildHandlers(workflow.handlers_),
handlers_: workflow.handlers_,
options: { ...workflow.options, ...options },
requiredModules,
optionalModules,
})

View File

@@ -1,2 +1,3 @@
export * from "./inject-transaction-manager"
export * from "./inject-manager"
export * from "./inject-shared-context"

View File

@@ -0,0 +1,25 @@
import { Context, SharedContext } from "@medusajs/types"
export function InjectSharedContext(): MethodDecorator {
return function (
target: any,
propertyKey: string | symbol,
descriptor: any
): void {
if (!target.MedusaContextIndex_) {
throw new Error(
`To apply @InjectSharedContext you have to flag a parameter using @MedusaContext`
)
}
const originalMethod = descriptor.value
const argIndex = target.MedusaContextIndex_[propertyKey]
descriptor.value = function (...args: any[]) {
const context: SharedContext | Context = { ...(args[argIndex] ?? {}) }
args[argIndex] = context
return originalMethod.apply(this, args)
}
}
}

View File

@@ -1,5 +1,5 @@
import { MedusaError } from "../common"
import { ModulesSdkTypes } from "@medusajs/types"
import { MedusaError } from "../common"
function getEnv(key: string, moduleName: string): string {
const value =
@@ -45,10 +45,12 @@ function getDatabaseUrl(
config: ModulesSdkTypes.ModuleServiceInitializeOptions
): string {
const { clientUrl, host, port, user, password, database } = config.database!
if (clientUrl) {
return clientUrl
if (host) {
return `postgres://${user}:${password}@${host}:${port}/${database}`
}
return `postgres://${user}:${password}@${host}:${port}/${database}`
return clientUrl!
}
/**
@@ -65,8 +67,9 @@ export function loadDatabaseConfig(
ModulesSdkTypes.ModuleServiceInitializeOptions["database"],
"clientUrl" | "schema" | "driverOptions" | "debug"
> {
const clientUrl = getEnv("POSTGRES_URL", moduleName)
const clientUrl =
options?.database?.clientUrl ?? getEnv("POSTGRES_URL", moduleName)
const database = {
clientUrl,
schema: getEnv("POSTGRES_SCHEMA", moduleName) ?? "public",
@@ -74,16 +77,20 @@ export function loadDatabaseConfig(
getEnv("POSTGRES_DRIVER_OPTIONS", moduleName) ||
JSON.stringify(getDefaultDriverOptions(clientUrl))
),
debug: process.env.NODE_ENV?.startsWith("dev") ?? false,
debug: false,
connection: undefined,
}
if (isModuleServiceInitializeOptions(options)) {
database.clientUrl = getDatabaseUrl(options)
database.clientUrl = getDatabaseUrl({
database: { ...options.database, clientUrl },
})
database.schema = options.database!.schema ?? database.schema
database.driverOptions =
options.database!.driverOptions ??
getDefaultDriverOptions(database.clientUrl)
database.debug = options.database!.debug ?? database.debug
database.connection = options.database!.connection
}
if (!database.clientUrl && !silent) {

View File

@@ -1,5 +1,6 @@
import {
LocalWorkflow,
TransactionModelOptions,
WorkflowHandler,
WorkflowManager,
} from "@medusajs/orchestration"
@@ -150,7 +151,8 @@ export function createWorkflow<
[K in keyof TResult]:
| WorkflowData<TResult[K]>
| WorkflowDataProperties<TResult[K]>
}
},
options?: TransactionModelOptions
): ReturnWorkflow<TData, TResult, THooks> {
const handlers: WorkflowHandler = new Map()
@@ -158,7 +160,7 @@ export function createWorkflow<
WorkflowManager.unregister(name)
}
WorkflowManager.register(name, undefined, handlers)
WorkflowManager.register(name, undefined, handlers, options)
const context: CreateWorkflowComposerContext = {
workflowId: name,