chore(orchestration): idempotent (#7771)

This commit is contained in:
Carlos R. L. Rodrigues
2024-06-25 10:34:00 -03:00
committed by GitHub
parent 66d17fabde
commit 5600e58b7f
19 changed files with 421 additions and 403 deletions
@@ -3,3 +3,4 @@ import { moduleDefinition } from "./module-definition"
export default moduleDefinition
export * from "./loaders"
export * from "./models"
@@ -1,26 +1,23 @@
import {
Context,
DAL,
FindConfig,
InternalModuleDeclaration,
IWorkflowEngineService,
ModuleJoinerConfig,
ModulesSdkTypes,
WorkflowsSdkTypes,
} from "@medusajs/types"
import {
InjectManager,
InjectSharedContext,
isString,
MedusaContext,
MedusaError,
ModulesSdkUtils,
} from "@medusajs/utils"
import type {
ReturnWorkflow,
UnwrapWorkflowInputDataType,
} from "@medusajs/workflows-sdk"
import { WorkflowExecution } from "@models"
import { WorkflowOrchestratorService } from "@services"
import { joinerConfig } from "../joiner-config"
import { entityNameToLinkableKeysMap, joinerConfig } from "../joiner-config"
type InjectedDependencies = {
baseRepository: DAL.RepositoryService
@@ -28,9 +25,13 @@ type InjectedDependencies = {
workflowOrchestratorService: WorkflowOrchestratorService
}
export class WorkflowsModuleService implements IWorkflowEngineService {
export class WorkflowsModuleService<
TWorkflowExecution extends WorkflowExecution = WorkflowExecution
> extends ModulesSdkUtils.MedusaService<{
WorkflowExecution: { dto: WorkflowExecution }
}>({ WorkflowExecution }, entityNameToLinkableKeysMap) {
protected baseRepository_: DAL.RepositoryService
protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<any>
protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<TWorkflowExecution>
protected workflowOrchestratorService_: WorkflowOrchestratorService
constructor(
@@ -41,6 +42,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
}: InjectedDependencies,
protected readonly moduleDeclaration: InternalModuleDeclaration
) {
// @ts-ignore
super(...arguments)
this.baseRepository_ = baseRepository
this.workflowExecutionService_ = workflowExecutionService
this.workflowOrchestratorService_ = workflowOrchestratorService
@@ -50,122 +54,6 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
return joinerConfig
}
@InjectManager("baseRepository_")
async retrieveWorkflowExecution(
idOrObject:
| string
| {
workflow_id: string
transaction_id: string
},
config: FindConfig<WorkflowsSdkTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<WorkflowsSdkTypes.WorkflowExecutionDTO> {
const objValue = isString(idOrObject)
? { id: idOrObject }
: {
workflow_id: idOrObject.workflow_id,
transaction_id: idOrObject.transaction_id,
}
const wfExecution = await this.workflowExecutionService_.list(
objValue,
config,
sharedContext
)
if (wfExecution.length === 0) {
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`WorkflowExecution with ${Object.keys(objValue).join(
", "
)}: ${Object.values(objValue).join(", ")} was not found`
)
}
// eslint-disable-next-line max-len
return await this.baseRepository_.serialize<WorkflowsSdkTypes.WorkflowExecutionDTO>(
wfExecution[0],
{
populate: true,
}
)
}
@InjectManager("baseRepository_")
async listWorkflowExecutions(
filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {},
config: FindConfig<WorkflowsSdkTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<WorkflowsSdkTypes.WorkflowExecutionDTO[]> {
if (filters.transaction_id) {
if (Array.isArray(filters.transaction_id)) {
filters.transaction_id = {
$in: filters.transaction_id,
}
}
}
if (filters.workflow_id) {
if (Array.isArray(filters.workflow_id)) {
filters.workflow_id = {
$in: filters.workflow_id,
}
}
}
const wfExecutions = await this.workflowExecutionService_.list(
filters,
config,
sharedContext
)
return await this.baseRepository_.serialize<
WorkflowsSdkTypes.WorkflowExecutionDTO[]
>(wfExecutions, {
populate: true,
})
}
@InjectManager("baseRepository_")
async listAndCountWorkflowExecutions(
filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {},
config: FindConfig<WorkflowsSdkTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<[WorkflowsSdkTypes.WorkflowExecutionDTO[], number]> {
if (filters.transaction_id) {
if (Array.isArray(filters.transaction_id)) {
filters.transaction_id = {
$in: filters.transaction_id,
}
}
}
if (filters.workflow_id) {
if (Array.isArray(filters.workflow_id)) {
filters.workflow_id = {
$in: filters.workflow_id,
}
}
}
const [wfExecutions, count] =
await this.workflowExecutionService_.listAndCount(
filters,
config,
sharedContext
)
return [
await this.baseRepository_.serialize<
WorkflowsSdkTypes.WorkflowExecutionDTO[]
>(wfExecutions, {
populate: true,
}),
count,
]
}
@InjectSharedContext()
async run<TWorkflow extends string | ReturnWorkflow<any, any, any>>(
workflowIdOrWorkflow: TWorkflow,
@@ -1,22 +1,22 @@
import {
DistributedTransaction,
DistributedTransactionStorage,
IDistributedSchedulerStorage,
IDistributedTransactionStorage,
SchedulerOptions,
TransactionCheckpoint,
TransactionOptions,
TransactionStep,
} from "@medusajs/orchestration"
import { ModulesSdkTypes } from "@medusajs/types"
import { Logger, ModulesSdkTypes } from "@medusajs/types"
import { MedusaError, TransactionState } from "@medusajs/utils"
import { WorkflowOrchestratorService } from "@services"
import { CronExpression, parseExpression } from "cron-parser"
// eslint-disable-next-line max-len
export class InMemoryDistributedTransactionStorage
implements IDistributedTransactionStorage, IDistributedSchedulerStorage
{
private workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService<any>
private logger_: Logger
private workflowOrchestratorService_: WorkflowOrchestratorService
private storage: Map<string, TransactionCheckpoint> = new Map()
@@ -34,10 +34,13 @@ export class InMemoryDistributedTransactionStorage
constructor({
workflowExecutionService,
logger,
}: {
workflowExecutionService: ModulesSdkTypes.IMedusaInternalService<any>
logger: Logger
}) {
this.workflowExecutionService_ = workflowExecutionService
this.logger_ = logger
}
setWorkflowOrchestratorService(workflowOrchestratorService) {
@@ -68,24 +71,43 @@ export class InMemoryDistributedTransactionStorage
])
}
/*private stringifyWithSymbol(key, value) {
if (key === "__type" && typeof value === "symbol") {
return Symbol.keyFor(value)
async get(
key: string,
options?: TransactionOptions
): Promise<TransactionCheckpoint | undefined> {
const data = this.storage.get(key)
if (data) {
return data
}
return value
}
private jsonWithSymbol(key, value) {
if (key === "__type" && typeof value === "string") {
return Symbol.for(value)
const { idempotent } = options ?? {}
if (!idempotent) {
return
}
return value
}*/
const [_, workflowId, transactionId] = key.split(":")
const trx = await this.workflowExecutionService_
.retrieve(
{
workflow_id: workflowId,
transaction_id: transactionId,
},
{
select: ["execution", "context"],
}
)
.catch(() => undefined)
async get(key: string): Promise<TransactionCheckpoint | undefined> {
return this.storage.get(key)
if (trx) {
return {
flow: trx.execution,
context: trx.context.data,
errors: trx.context.errors,
}
}
return
}
async list(): Promise<TransactionCheckpoint[]> {
@@ -95,12 +117,11 @@ export class InMemoryDistributedTransactionStorage
async save(
key: string,
data: TransactionCheckpoint,
ttl?: number
ttl?: number,
options?: TransactionOptions
): Promise<void> {
this.storage.set(key, data)
let retentionTime
/**
* Store the retention time only if the transaction is done, failed or reverted.
* From that moment, this tuple can be later on archived or deleted after the retention time.
@@ -111,8 +132,9 @@ export class InMemoryDistributedTransactionStorage
TransactionState.REVERTED,
].includes(data.flow.state)
const { retentionTime, idempotent } = options ?? {}
if (hasFinished) {
retentionTime = data.flow.options?.retentionTime
Object.assign(data, {
retention_time: retentionTime,
})
@@ -121,7 +143,7 @@ export class InMemoryDistributedTransactionStorage
const stringifiedData = JSON.stringify(data)
const parsedData = JSON.parse(stringifiedData)
if (hasFinished && !retentionTime) {
if (hasFinished && !retentionTime && !idempotent) {
await this.deleteFromDb(parsedData)
} else {
await this.saveToDb(parsedData)
@@ -304,7 +326,7 @@ export class InMemoryDistributedTransactionStorage
})
} catch (e) {
if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) {
console.warn(
this.logger_?.warn(
`Tried to execute a scheduled workflow with ID ${jobId} that does not exist, removing it from the scheduler.`
)