feat(medusa): workflow engine api (#6330)

What:

   Workflow Engine API.
   Endpoints for:
     - List workflow executions
     - Run a workflow
     - Set async steps as success or failure
     - Retrieve the details of a workflow run
This commit is contained in:
Carlos R. L. Rodrigues
2024-02-13 12:19:10 -03:00
committed by GitHub
parent 59ab66a773
commit 0c2a460751
44 changed files with 1425 additions and 80 deletions

View File

@@ -1,12 +1,13 @@
import {
ExternalModuleDeclaration,
InternalModuleDeclaration,
MedusaModule,
MODULE_PACKAGE_NAMES,
MedusaModule,
Modules,
} from "@medusajs/modules-sdk"
import { ModulesSdkTypes } from "@medusajs/types"
import { WorkflowOrchestratorTypes } from "@medusajs/workflows-sdk"
import { IWorkflowEngineService } from "@medusajs/workflows-sdk"
import { moduleDefinition } from "../module-definition"
import { InitializeModuleInjectableDependencies } from "../types"
@@ -17,20 +18,18 @@ export const initialize = async (
| ExternalModuleDeclaration
| InternalModuleDeclaration,
injectedDependencies?: InitializeModuleInjectableDependencies
): Promise<WorkflowOrchestratorTypes.IWorkflowsModuleService> => {
): Promise<IWorkflowEngineService> => {
const loaded =
// eslint-disable-next-line max-len
await MedusaModule.bootstrap<WorkflowOrchestratorTypes.IWorkflowsModuleService>(
{
moduleKey: Modules.WORKFLOW_ENGINE,
defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE],
declaration: options as
| InternalModuleDeclaration
| ExternalModuleDeclaration,
injectedDependencies,
moduleExports: moduleDefinition,
}
)
await MedusaModule.bootstrap<IWorkflowEngineService>({
moduleKey: Modules.WORKFLOW_ENGINE,
defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE],
declaration: options as
| InternalModuleDeclaration
| ExternalModuleDeclaration,
injectedDependencies,
moduleExports: moduleDefinition,
})
return loaded[Modules.WORKFLOW_ENGINE]
}

View File

@@ -10,14 +10,17 @@ import {
InjectManager,
InjectSharedContext,
MedusaContext,
MedusaError,
isString,
} from "@medusajs/utils"
import type {
IWorkflowEngineService,
ReturnWorkflow,
UnwrapWorkflowInputDataType,
WorkflowOrchestratorTypes,
} from "@medusajs/workflows-sdk"
import {WorkflowOrchestratorService} from "@services"
import {joinerConfig} from "../joiner-config"
import { WorkflowOrchestratorService } from "@services"
import { joinerConfig } from "../joiner-config"
type InjectedDependencies = {
baseRepository: DAL.RepositoryService
@@ -25,9 +28,7 @@ type InjectedDependencies = {
workflowOrchestratorService: WorkflowOrchestratorService
}
export class WorkflowsModuleService
implements WorkflowOrchestratorTypes.IWorkflowsModuleService
{
export class WorkflowsModuleService implements IWorkflowEngineService {
protected baseRepository_: DAL.RepositoryService
protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
protected workflowOrchestratorService_: WorkflowOrchestratorService
@@ -49,12 +50,70 @@ export class WorkflowsModuleService
return joinerConfig
}
@InjectManager("baseRepository_")
async retrieveWorkflowExecution(
idOrObject:
| string
| {
workflow_id: string
transaction_id: string
},
config: FindConfig<WorkflowOrchestratorTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<WorkflowOrchestratorTypes.WorkflowExecutionDTO> {
const objValue = isString(idOrObject)
? { id: idOrObject }
: {
workflow_id: idOrObject.workflow_id,
transaction_id: idOrObject.transaction_id,
}
const wfExecution = await this.workflowExecutionService_.list(
objValue,
config,
sharedContext
)
if (wfExecution.length === 0) {
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`WorkflowExecution with ${Object.keys(objValue).join(
", "
)}: ${Object.values(objValue).join(", ")} was not found`
)
}
// eslint-disable-next-line max-len
return await this.baseRepository_.serialize<WorkflowOrchestratorTypes.WorkflowExecutionDTO>(
wfExecution[0],
{
populate: true,
}
)
}
@InjectManager("baseRepository_")
async listWorkflowExecution(
filters: WorkflowOrchestratorTypes.FilterableWorkflowExecutionProps = {},
config: FindConfig<WorkflowOrchestratorTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<WorkflowOrchestratorTypes.WorkflowExecutionDTO[]> {
if (filters.transaction_id) {
if (Array.isArray(filters.transaction_id)) {
filters.transaction_id = {
$in: filters.transaction_id,
}
}
}
if (filters.workflow_id) {
if (Array.isArray(filters.workflow_id)) {
filters.workflow_id = {
$in: filters.workflow_id,
}
}
}
const wfExecutions = await this.workflowExecutionService_.list(
filters,
config,
@@ -74,6 +133,22 @@ export class WorkflowsModuleService
config: FindConfig<WorkflowOrchestratorTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<[WorkflowOrchestratorTypes.WorkflowExecutionDTO[], number]> {
if (filters.transaction_id) {
if (Array.isArray(filters.transaction_id)) {
filters.transaction_id = {
$in: filters.transaction_id,
}
}
}
if (filters.workflow_id) {
if (Array.isArray(filters.workflow_id)) {
filters.workflow_id = {
$in: filters.workflow_id,
}
}
}
const [wfExecutions, count] =
await this.workflowExecutionService_.listAndCount(
filters,

View File

@@ -4,11 +4,9 @@ import {
TransactionCheckpoint,
TransactionStep,
} from "@medusajs/orchestration"
import { TransactionState } from "@medusajs/utils"
import { ModulesSdkTypes } from "@medusajs/types"
import {
WorkflowOrchestratorService,
} from "@services"
import { TransactionState } from "@medusajs/utils"
import { WorkflowOrchestratorService } from "@services"
import { Queue, Worker } from "bullmq"
import Redis from "ioredis"
@@ -21,7 +19,7 @@ enum JobType {
// eslint-disable-next-line max-len
export class RedisDistributedTransactionStorage extends DistributedTransactionStorage {
private static TTL_AFTER_COMPLETED = 60 * 15 // 15 minutes
private workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
private workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
private workflowOrchestratorService_: WorkflowOrchestratorService
private redisClient: Redis
@@ -34,7 +32,7 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
redisWorkerConnection,
redisQueueName,
}: {
workflowExecutionService: ModulesSdkTypes.InternalModuleService<any>,
workflowExecutionService: ModulesSdkTypes.InternalModuleService<any>
redisConnection: Redis
redisWorkerConnection: Redis
redisQueueName: string
@@ -161,33 +159,28 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
})
}
const stringifiedData = JSON.stringify(data, this.stringifyWithSymbol)
const parsedData = JSON.parse(stringifiedData)
if (!hasFinished) {
if (ttl) {
await this.redisClient.set(
key,
JSON.stringify(data, this.stringifyWithSymbol),
"EX",
ttl
)
await this.redisClient.set(key, stringifiedData, "EX", ttl)
} else {
await this.redisClient.set(
key,
JSON.stringify(data, this.stringifyWithSymbol)
)
await this.redisClient.set(key, stringifiedData)
}
}
if (hasFinished && !retentionTime) {
await this.deleteFromDb(data)
await this.deleteFromDb(parsedData)
} else {
await this.saveToDb(data)
await this.saveToDb(parsedData)
}
if (hasFinished) {
// await this.redisClient.del(key)
await this.redisClient.set(
key,
JSON.stringify(data, this.stringifyWithSymbol),
stringifiedData,
"EX",
RedisDistributedTransactionStorage.TTL_AFTER_COMPLETED
)