chore(): Reorganize modules (#7210)

**What**
Move all modules to the modules directory
This commit is contained in:
Adrien de Peretti
2024-05-02 17:33:34 +02:00
committed by GitHub
parent 7a351eef09
commit 4eae25e1ef
870 changed files with 91 additions and 62 deletions

View File

@@ -0,0 +1,22 @@
import { Modules } from "@medusajs/modules-sdk"
import { ModulesSdkUtils } from "@medusajs/utils"
import * as models from "@models"
import { moduleDefinition } from "./module-definition"
export default moduleDefinition
const migrationScriptOptions = {
moduleName: Modules.WORKFLOW_ENGINE,
models: models,
pathToMigrations: __dirname + "/migrations",
}
export const runMigrations = ModulesSdkUtils.buildMigrationScript(
migrationScriptOptions
)
export const revertMigration = ModulesSdkUtils.buildRevertMigrationScript(
migrationScriptOptions
)
export * from "./initialize"
export * from "./loaders"

View File

@@ -0,0 +1,35 @@
import {
ExternalModuleDeclaration,
InternalModuleDeclaration,
MODULE_PACKAGE_NAMES,
MedusaModule,
Modules,
} from "@medusajs/modules-sdk"
import { ModulesSdkTypes } from "@medusajs/types"
import { IWorkflowEngineService } from "@medusajs/workflows-sdk"
import { moduleDefinition } from "../module-definition"
import { InitializeModuleInjectableDependencies } from "../types"
export const initialize = async (
options?:
| ModulesSdkTypes.ModuleServiceInitializeOptions
| ModulesSdkTypes.ModuleServiceInitializeCustomDataLayerOptions
| ExternalModuleDeclaration
| InternalModuleDeclaration,
injectedDependencies?: InitializeModuleInjectableDependencies
): Promise<IWorkflowEngineService> => {
const loaded =
// eslint-disable-next-line max-len
await MedusaModule.bootstrap<IWorkflowEngineService>({
moduleKey: Modules.WORKFLOW_ENGINE,
defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE],
declaration: options as
| InternalModuleDeclaration
| ExternalModuleDeclaration,
injectedDependencies,
moduleExports: moduleDefinition,
})
return loaded[Modules.WORKFLOW_ENGINE]
}

View File

@@ -0,0 +1,34 @@
import { Modules } from "@medusajs/modules-sdk"
import { ModuleJoinerConfig } from "@medusajs/types"
import { MapToConfig } from "@medusajs/utils"
import { WorkflowExecution } from "@models"
import moduleSchema from "./schema"
export const LinkableKeys = {
workflow_execution_id: WorkflowExecution.name,
}
const entityLinkableKeysMap: MapToConfig = {}
Object.entries(LinkableKeys).forEach(([key, value]) => {
entityLinkableKeysMap[value] ??= []
entityLinkableKeysMap[value].push({
mapTo: key,
valueFrom: key.split("_").pop()!,
})
})
export const entityNameToLinkableKeysMap: MapToConfig = entityLinkableKeysMap
export const joinerConfig: ModuleJoinerConfig = {
serviceName: Modules.WORKFLOW_ENGINE,
primaryKeys: ["id"],
schema: moduleSchema,
linkableKeys: LinkableKeys,
alias: {
name: ["workflow_execution", "workflow_executions"],
args: {
entity: WorkflowExecution.name,
methodSuffix: "WorkflowExecution",
},
},
}

View File

@@ -0,0 +1,36 @@
import {
InternalModuleDeclaration,
LoaderOptions,
Modules,
} from "@medusajs/modules-sdk"
import { ModulesSdkTypes } from "@medusajs/types"
import { ModulesSdkUtils } from "@medusajs/utils"
import { EntitySchema } from "@mikro-orm/core"
import * as WorkflowOrchestratorModels from "../models"
export default async (
{
options,
container,
logger,
}: LoaderOptions<
| ModulesSdkTypes.ModuleServiceInitializeOptions
| ModulesSdkTypes.ModuleServiceInitializeCustomDataLayerOptions
>,
moduleDeclaration?: InternalModuleDeclaration
): Promise<void> => {
const entities = Object.values(
WorkflowOrchestratorModels
) as unknown as EntitySchema[]
const pathToMigrations = __dirname + "/../migrations"
await ModulesSdkUtils.mikroOrmConnectionLoader({
moduleName: Modules.WORKFLOW_ENGINE,
entities,
container,
options,
moduleDeclaration,
logger,
pathToMigrations,
})
}

View File

@@ -0,0 +1,9 @@
import { MikroOrmBaseRepository, ModulesSdkUtils } from "@medusajs/utils"
import * as ModuleModels from "@models"
import * as ModuleServices from "@services"
export default ModulesSdkUtils.moduleContainerLoaderFactory({
moduleModels: ModuleModels,
moduleServices: ModuleServices,
moduleRepositories: { BaseRepository: MikroOrmBaseRepository },
})

View File

@@ -0,0 +1,4 @@
export * from "./connection"
export * from "./container"
export * from "./redis"
export * from "./utils"

View File

@@ -0,0 +1,88 @@
import { LoaderOptions } from "@medusajs/modules-sdk"
import { asValue } from "awilix"
import Redis from "ioredis"
import { RedisWorkflowsOptions } from "../types"
export default async ({
container,
logger,
options,
dataLoaderOnly,
}: LoaderOptions): Promise<void> => {
const {
url,
options: redisOptions,
pubsub,
} = options?.redis as RedisWorkflowsOptions
// TODO: get default from ENV VAR
if (!url) {
throw Error(
"No `redis.url` provided in `workflowOrchestrator` module options. It is required for the Workflow Orchestrator Redis."
)
}
const cnnPubSub = pubsub ?? { url, options: redisOptions }
const queueName = options?.queueName ?? "medusa-workflows"
let connection
let redisPublisher
let redisSubscriber
let workerConnection
try {
connection = await getConnection(url, redisOptions)
workerConnection = await getConnection(url, {
...(redisOptions ?? {}),
maxRetriesPerRequest: null,
})
logger?.info(
`Connection to Redis in module 'workflow-engine-redis' established`
)
} catch (err) {
logger?.error(
`An error occurred while connecting to Redis in module 'workflow-engine-redis': ${err}`
)
}
try {
redisPublisher = await getConnection(cnnPubSub.url, cnnPubSub.options)
redisSubscriber = await getConnection(cnnPubSub.url, cnnPubSub.options)
logger?.info(
`Connection to Redis PubSub in module 'workflow-engine-redis' established`
)
} catch (err) {
logger?.error(
`An error occurred while connecting to Redis PubSub in module 'workflow-engine-redis': ${err}`
)
}
container.register({
partialLoading: asValue(true),
redisConnection: asValue(connection),
redisWorkerConnection: asValue(workerConnection),
redisPublisher: asValue(redisPublisher),
redisSubscriber: asValue(redisSubscriber),
redisQueueName: asValue(queueName),
redisDisconnectHandler: asValue(async () => {
connection.disconnect()
workerConnection.disconnect()
redisPublisher.disconnect()
redisSubscriber.disconnect()
}),
})
}
async function getConnection(url, redisOptions) {
const connection = new Redis(url, {
lazyConnect: true,
...(redisOptions ?? {}),
})
await new Promise(async (resolve) => {
await connection.connect(resolve)
})
return connection
}

View File

@@ -0,0 +1,11 @@
import { asClass, asValue } from "awilix"
import { RedisDistributedTransactionStorage } from "../utils"
export default async ({ container, dataLoaderOnly }): Promise<void> => {
container.register({
redisDistributedTransactionStorage: asClass(
RedisDistributedTransactionStorage
).singleton(),
dataLoaderOnly: asValue(!!dataLoaderOnly),
})
}

View File

@@ -0,0 +1,41 @@
import { Migration } from "@mikro-orm/migrations"
export class Migration20231221104256 extends Migration {
async up(): Promise<void> {
this.addSql(
`
CREATE TABLE IF NOT EXISTS workflow_execution
(
id character varying NOT NULL,
workflow_id character varying NOT NULL,
transaction_id character varying NOT NULL,
execution jsonb NULL,
context jsonb NULL,
state character varying NOT NULL,
created_at timestamp WITHOUT time zone NOT NULL DEFAULT Now(),
updated_at timestamp WITHOUT time zone NOT NULL DEFAULT Now(),
deleted_at timestamp WITHOUT time zone NULL,
CONSTRAINT "PK_workflow_execution_workflow_id_transaction_id" PRIMARY KEY ("workflow_id", "transaction_id")
);
CREATE UNIQUE INDEX IF NOT EXISTS "IDX_workflow_execution_id" ON "workflow_execution" ("id");
CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id" ON "workflow_execution" ("workflow_id") WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_transaction_id" ON "workflow_execution" ("transaction_id") WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state" ON "workflow_execution" ("state") WHERE deleted_at IS NULL;
`
)
}
async down(): Promise<void> {
this.addSql(
`
DROP INDEX "IDX_workflow_execution_id";
DROP INDEX "IDX_workflow_execution_workflow_id";
DROP INDEX "IDX_workflow_execution_transaction_id";
DROP INDEX "IDX_workflow_execution_state";
DROP TABLE IF EXISTS workflow_execution;
`
)
}
}

View File

@@ -0,0 +1 @@
export { default as WorkflowExecution } from "./workflow-execution"

View File

@@ -0,0 +1,76 @@
import { TransactionState } from "@medusajs/orchestration"
import { DALUtils, generateEntityId } from "@medusajs/utils"
import {
BeforeCreate,
Entity,
Enum,
Filter,
Index,
OnInit,
OptionalProps,
PrimaryKey,
Property,
Unique,
} from "@mikro-orm/core"
type OptionalFields = "deleted_at"
@Entity()
@Unique({
name: "IDX_workflow_execution_workflow_id_transaction_id_unique",
properties: ["workflow_id", "transaction_id"],
})
@Filter(DALUtils.mikroOrmSoftDeletableFilterOptions)
export default class WorkflowExecution {
[OptionalProps]?: OptionalFields
@Property({ columnType: "text", nullable: false })
@Index({ name: "IDX_workflow_execution_id" })
id!: string
@Index({ name: "IDX_workflow_execution_workflow_id" })
@PrimaryKey({ columnType: "text" })
workflow_id: string
@Index({ name: "IDX_workflow_execution_transaction_id" })
@PrimaryKey({ columnType: "text" })
transaction_id: string
@Property({ columnType: "jsonb", nullable: true })
execution: Record<string, unknown> | null = null
@Property({ columnType: "jsonb", nullable: true })
context: Record<string, unknown> | null = null
@Index({ name: "IDX_workflow_execution_state" })
@Enum(() => TransactionState)
state: TransactionState
@Property({
onCreate: () => new Date(),
columnType: "timestamptz",
defaultRaw: "now()",
})
created_at: Date
@Property({
onCreate: () => new Date(),
onUpdate: () => new Date(),
columnType: "timestamptz",
defaultRaw: "now()",
})
updated_at: Date
@Property({ columnType: "timestamptz", nullable: true })
deleted_at: Date | null = null
@BeforeCreate()
onCreate() {
this.id = generateEntityId(this.id, "wf_exec")
}
@OnInit()
onInit() {
this.id = generateEntityId(this.id, "wf_exec")
}
}

View File

@@ -0,0 +1,19 @@
import { ModuleExports } from "@medusajs/types"
import { WorkflowsModuleService } from "@services"
import loadConnection from "./loaders/connection"
import loadContainer from "./loaders/container"
import redisConnection from "./loaders/redis"
import loadUtils from "./loaders/utils"
const service = WorkflowsModuleService
const loaders = [
loadContainer,
loadConnection,
loadUtils,
redisConnection,
] as any
export const moduleDefinition: ModuleExports = {
service,
loaders,
}

View File

@@ -0,0 +1,2 @@
export { MikroOrmBaseRepository as BaseRepository } from "@medusajs/utils"
export { WorkflowExecutionRepository } from "./workflow-execution"

View File

@@ -0,0 +1,7 @@
import { DALUtils } from "@medusajs/utils"
import { WorkflowExecution } from "@models"
// eslint-disable-next-line max-len
export class WorkflowExecutionRepository extends DALUtils.mikroOrmBaseRepositoryFactory(
WorkflowExecution
) {}

View File

@@ -0,0 +1,26 @@
export default `
scalar DateTime
scalar JSON
enum TransactionState {
NOT_STARTED
INVOKING
WAITING_TO_COMPENSATE
COMPENSATING
DONE
REVERTED
FAILED
}
type WorkflowExecution {
id: ID!
created_at: DateTime!
updated_at: DateTime!
deleted_at: DateTime
workflow_id: string
transaction_id: string
execution: JSON
context: JSON
state: TransactionState
}
`

View File

@@ -0,0 +1,2 @@
export * from "./workflow-orchestrator"
export * from "./workflows-module"

View File

@@ -0,0 +1,637 @@
import {
DistributedTransaction,
DistributedTransactionEvents,
TransactionHandlerType,
TransactionStep,
} from "@medusajs/orchestration"
import {
ContainerLike,
Context,
Logger,
MedusaContainer,
} from "@medusajs/types"
import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils"
import {
FlowRunOptions,
MedusaWorkflow,
resolveValue,
ReturnWorkflow,
} from "@medusajs/workflows-sdk"
import Redis from "ioredis"
import { ulid } from "ulid"
import type { RedisDistributedTransactionStorage } from "../utils"
export type WorkflowOrchestratorRunOptions<T> = Omit<
FlowRunOptions<T>,
"container"
> & {
transactionId?: string
container?: ContainerLike
}
type RegisterStepSuccessOptions<T> = Omit<
WorkflowOrchestratorRunOptions<T>,
"transactionId" | "input"
>
type IdempotencyKeyParts = {
workflowId: string
transactionId: string
stepId: string
action: "invoke" | "compensate"
}
type NotifyOptions = {
eventType: keyof DistributedTransactionEvents
workflowId: string
transactionId?: string
step?: TransactionStep
response?: unknown
result?: unknown
errors?: unknown[]
}
type WorkflowId = string
type TransactionId = string
type SubscriberHandler = {
(input: NotifyOptions): void
} & {
_id?: string
}
type SubscribeOptions = {
workflowId: string
transactionId?: string
subscriber: SubscriberHandler
subscriberId?: string
}
type UnsubscribeOptions = {
workflowId: string
transactionId?: string
subscriberOrId: string | SubscriberHandler
}
type TransactionSubscribers = Map<TransactionId, SubscriberHandler[]>
type Subscribers = Map<WorkflowId, TransactionSubscribers>
const AnySubscriber = "any"
export class WorkflowOrchestratorService {
private instanceId = ulid()
protected redisPublisher: Redis
protected redisSubscriber: Redis
private subscribers: Subscribers = new Map()
private activeStepsCount: number = 0
private logger: Logger
protected redisDistributedTransactionStorage_: RedisDistributedTransactionStorage
constructor({
dataLoaderOnly,
redisDistributedTransactionStorage,
redisPublisher,
redisSubscriber,
logger,
}: {
dataLoaderOnly: boolean
redisDistributedTransactionStorage: RedisDistributedTransactionStorage
workflowOrchestratorService: WorkflowOrchestratorService
redisPublisher: Redis
redisSubscriber: Redis
logger: Logger
}) {
this.redisPublisher = redisPublisher
this.redisSubscriber = redisSubscriber
this.logger = logger
redisDistributedTransactionStorage.setWorkflowOrchestratorService(this)
if (!dataLoaderOnly) {
DistributedTransaction.setStorage(redisDistributedTransactionStorage)
}
this.redisDistributedTransactionStorage_ =
redisDistributedTransactionStorage
this.redisSubscriber.on("message", async (_, message) => {
const { instanceId, data } = JSON.parse(message)
await this.notify(data, false, instanceId)
})
}
async onApplicationShutdown() {
await this.redisDistributedTransactionStorage_.onApplicationShutdown()
}
async onApplicationPrepareShutdown() {
// eslint-disable-next-line max-len
await this.redisDistributedTransactionStorage_.onApplicationPrepareShutdown()
while (this.activeStepsCount > 0) {
await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
@InjectSharedContext()
async run<T = unknown>(
workflowIdOrWorkflow: string | ReturnWorkflow<any, any, any>,
options?: WorkflowOrchestratorRunOptions<T>,
@MedusaContext() sharedContext: Context = {}
) {
let {
input,
context,
transactionId,
resultFrom,
throwOnError,
events: eventHandlers,
container,
} = options ?? {}
const workflowId = isString(workflowIdOrWorkflow)
? workflowIdOrWorkflow
: workflowIdOrWorkflow.getName()
if (!workflowId) {
throw new Error("Workflow ID is required")
}
context ??= {}
context.transactionId ??= transactionId ?? ulid()
const events: FlowRunOptions["events"] = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
workflowId,
transactionId: context.transactionId,
})
const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId)
if (!exportedWorkflow) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const ret = await flow.run({
input,
throwOnError,
resultFrom,
context,
events,
})
// TODO: temporary
const acknowledgement = {
transactionId: context.transactionId,
workflowId: workflowId,
}
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await this.notify({
eventType: "onFinish",
workflowId,
transactionId: context.transactionId,
result,
errors,
})
}
return { acknowledgement, ...ret }
}
@InjectSharedContext()
async getRunningTransaction(
workflowId: string,
transactionId: string,
options?: WorkflowOrchestratorRunOptions<undefined>,
@MedusaContext() sharedContext: Context = {}
): Promise<DistributedTransaction> {
let { context, container } = options ?? {}
if (!workflowId) {
throw new Error("Workflow ID is required")
}
if (!transactionId) {
throw new Error("TransactionId ID is required")
}
context ??= {}
context.transactionId ??= transactionId
const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId)
if (!exportedWorkflow) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const transaction = await flow.getRunningTransaction(transactionId, context)
return transaction
}
@InjectSharedContext()
async setStepSuccess<T = unknown>(
{
idempotencyKey,
stepResponse,
options,
}: {
idempotencyKey: string | IdempotencyKeyParts
stepResponse: unknown
options?: RegisterStepSuccessOptions<T>
},
@MedusaContext() sharedContext: Context = {}
) {
const {
context,
throwOnError,
resultFrom,
container,
events: eventHandlers,
} = options ?? {}
const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)
const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId)
if (!exportedWorkflow) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
transactionId,
workflowId,
})
const ret = await flow.registerStepSuccess({
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
events,
response: stepResponse,
})
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await this.notify({
eventType: "onFinish",
workflowId,
transactionId,
result,
errors,
})
}
return ret
}
@InjectSharedContext()
async setStepFailure<T = unknown>(
{
idempotencyKey,
stepResponse,
options,
}: {
idempotencyKey: string | IdempotencyKeyParts
stepResponse: unknown
options?: RegisterStepSuccessOptions<T>
},
@MedusaContext() sharedContext: Context = {}
) {
const {
context,
throwOnError,
resultFrom,
container,
events: eventHandlers,
} = options ?? {}
const [idempotencyKey_, { workflowId, transactionId }] =
this.buildIdempotencyKeyAndParts(idempotencyKey)
const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId)
if (!exportedWorkflow) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
transactionId,
workflowId,
})
const ret = await flow.registerStepFailure({
idempotencyKey: idempotencyKey_,
context,
resultFrom,
throwOnError,
events,
response: stepResponse,
})
if (ret.transaction.hasFinished()) {
const { result, errors } = ret
await this.notify({
eventType: "onFinish",
workflowId,
transactionId,
result,
errors,
})
}
return ret
}
@InjectSharedContext()
subscribe(
{ workflowId, transactionId, subscriber, subscriberId }: SubscribeOptions,
@MedusaContext() sharedContext: Context = {}
) {
subscriber._id = subscriberId
const subscribers = this.subscribers.get(workflowId) ?? new Map()
// Subscribe instance to redis
if (!this.subscribers.has(workflowId)) {
void this.redisSubscriber.subscribe(this.getChannelName(workflowId))
}
const handlerIndex = (handlers) => {
return handlers.indexOf((s) => s === subscriber || s._id === subscriberId)
}
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? []
const subscriberIndex = handlerIndex(transactionSubscribers)
if (subscriberIndex !== -1) {
transactionSubscribers.slice(subscriberIndex, 1)
}
transactionSubscribers.push(subscriber)
subscribers.set(transactionId, transactionSubscribers)
this.subscribers.set(workflowId, subscribers)
return
}
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
const subscriberIndex = handlerIndex(workflowSubscribers)
if (subscriberIndex !== -1) {
workflowSubscribers.slice(subscriberIndex, 1)
}
workflowSubscribers.push(subscriber)
subscribers.set(AnySubscriber, workflowSubscribers)
this.subscribers.set(workflowId, subscribers)
}
@InjectSharedContext()
unsubscribe(
{ workflowId, transactionId, subscriberOrId }: UnsubscribeOptions,
@MedusaContext() sharedContext: Context = {}
) {
const subscribers = this.subscribers.get(workflowId) ?? new Map()
const filterSubscribers = (handlers: SubscriberHandler[]) => {
return handlers.filter((handler) => {
return handler._id
? handler._id !== (subscriberOrId as string)
: handler !== (subscriberOrId as SubscriberHandler)
})
}
// Unsubscribe instance
if (!this.subscribers.has(workflowId)) {
void this.redisSubscriber.unsubscribe(this.getChannelName(workflowId))
}
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? []
const newTransactionSubscribers = filterSubscribers(
transactionSubscribers
)
subscribers.set(transactionId, newTransactionSubscribers)
this.subscribers.set(workflowId, subscribers)
return
}
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
const newWorkflowSubscribers = filterSubscribers(workflowSubscribers)
subscribers.set(AnySubscriber, newWorkflowSubscribers)
this.subscribers.set(workflowId, subscribers)
}
private async notify(
options: NotifyOptions,
publish = true,
instanceId = this.instanceId
) {
if (!publish && instanceId === this.instanceId) {
return
}
if (publish) {
const channel = this.getChannelName(options.workflowId)
const message = JSON.stringify({
instanceId: this.instanceId,
data: options,
})
await this.redisPublisher.publish(channel, message)
}
const {
eventType,
workflowId,
transactionId,
errors,
result,
step,
response,
} = options
const subscribers: TransactionSubscribers =
this.subscribers.get(workflowId) ?? new Map()
const notifySubscribers = (handlers: SubscriberHandler[]) => {
handlers.forEach((handler) => {
handler({
eventType,
workflowId,
transactionId,
step,
response,
result,
errors,
})
})
}
if (transactionId) {
const transactionSubscribers = subscribers.get(transactionId) ?? []
notifySubscribers(transactionSubscribers)
}
const workflowSubscribers = subscribers.get(AnySubscriber) ?? []
notifySubscribers(workflowSubscribers)
}
private getChannelName(workflowId: string): string {
return `orchestrator:${workflowId}`
}
private buildWorkflowEvents({
customEventHandlers,
workflowId,
transactionId,
}): DistributedTransactionEvents {
const notify = async ({
eventType,
step,
result,
response,
errors,
}: {
eventType: keyof DistributedTransactionEvents
step?: TransactionStep
response?: unknown
result?: unknown
errors?: unknown[]
}) => {
await this.notify({
workflowId,
transactionId,
eventType,
response,
step,
result,
errors,
})
}
return {
onTimeout: async ({ transaction }) => {
customEventHandlers?.onTimeout?.({ transaction })
await notify({ eventType: "onTimeout" })
},
onBegin: async ({ transaction }) => {
customEventHandlers?.onBegin?.({ transaction })
await notify({ eventType: "onBegin" })
},
onResume: async ({ transaction }) => {
customEventHandlers?.onResume?.({ transaction })
await notify({ eventType: "onResume" })
},
onCompensateBegin: async ({ transaction }) => {
customEventHandlers?.onCompensateBegin?.({ transaction })
await notify({ eventType: "onCompensateBegin" })
},
onFinish: async ({ transaction, result, errors }) => {
// TODO: unsubscribe transaction handlers on finish
customEventHandlers?.onFinish?.({ transaction, result, errors })
},
onStepBegin: async ({ step, transaction }) => {
customEventHandlers?.onStepBegin?.({ step, transaction })
this.activeStepsCount++
await notify({ eventType: "onStepBegin", step })
},
onStepSuccess: async ({ step, transaction }) => {
const stepName = step.definition.action!
const response = await resolveValue(
transaction.getContext().invoke[stepName],
transaction
)
customEventHandlers?.onStepSuccess?.({ step, transaction, response })
await notify({ eventType: "onStepSuccess", step, response })
this.activeStepsCount--
},
onStepFailure: async ({ step, transaction }) => {
const stepName = step.definition.action!
const errors = transaction
.getErrors(TransactionHandlerType.INVOKE)
.filter((err) => err.action === stepName)
customEventHandlers?.onStepFailure?.({ step, transaction, errors })
await notify({ eventType: "onStepFailure", step, errors })
this.activeStepsCount--
},
onStepAwaiting: async ({ step, transaction }) => {
customEventHandlers?.onStepAwaiting?.({ step, transaction })
await notify({ eventType: "onStepAwaiting", step })
this.activeStepsCount--
},
onCompensateStepSuccess: async ({ step, transaction }) => {
const stepName = step.definition.action!
const response = transaction.getContext().compensate[stepName]
customEventHandlers?.onCompensateStepSuccess?.({
step,
transaction,
response,
})
await notify({ eventType: "onCompensateStepSuccess", step, response })
},
onCompensateStepFailure: async ({ step, transaction }) => {
const stepName = step.definition.action!
const errors = transaction
.getErrors(TransactionHandlerType.COMPENSATE)
.filter((err) => err.action === stepName)
customEventHandlers?.onStepFailure?.({ step, transaction, errors })
await notify({ eventType: "onCompensateStepFailure", step, errors })
},
}
}
private buildIdempotencyKeyAndParts(
idempotencyKey: string | IdempotencyKeyParts
): [string, IdempotencyKeyParts] {
const parts: IdempotencyKeyParts = {
workflowId: "",
transactionId: "",
stepId: "",
action: "invoke",
}
let idempotencyKey_ = idempotencyKey as string
const setParts = (workflowId, transactionId, stepId, action) => {
parts.workflowId = workflowId
parts.transactionId = transactionId
parts.stepId = stepId
parts.action = action
}
if (!isString(idempotencyKey)) {
const { workflowId, transactionId, stepId, action } =
idempotencyKey as IdempotencyKeyParts
idempotencyKey_ = [workflowId, transactionId, stepId, action].join(":")
setParts(workflowId, transactionId, stepId, action)
} else {
const [workflowId, transactionId, stepId, action] =
idempotencyKey_.split(":")
setParts(workflowId, transactionId, stepId, action)
}
return [idempotencyKey_, parts]
}
}

View File

@@ -0,0 +1,285 @@
import {
Context,
DAL,
FindConfig,
InternalModuleDeclaration,
ModuleJoinerConfig,
ModulesSdkTypes,
} from "@medusajs/types"
import {
InjectManager,
InjectSharedContext,
MedusaContext,
MedusaError,
isString,
} from "@medusajs/utils"
import type {
IWorkflowEngineService,
ReturnWorkflow,
UnwrapWorkflowInputDataType,
WorkflowOrchestratorTypes,
} from "@medusajs/workflows-sdk"
import { WorkflowOrchestratorService } from "@services"
import { joinerConfig } from "../joiner-config"
type InjectedDependencies = {
baseRepository: DAL.RepositoryService
workflowExecutionService: ModulesSdkTypes.InternalModuleService<any>
workflowOrchestratorService: WorkflowOrchestratorService
redisDisconnectHandler: () => Promise<void>
}
export class WorkflowsModuleService implements IWorkflowEngineService {
protected baseRepository_: DAL.RepositoryService
protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
protected workflowOrchestratorService_: WorkflowOrchestratorService
protected redisDisconnectHandler_: () => Promise<void>
constructor(
{
baseRepository,
workflowExecutionService,
workflowOrchestratorService,
redisDisconnectHandler,
}: InjectedDependencies,
protected readonly moduleDeclaration: InternalModuleDeclaration
) {
this.baseRepository_ = baseRepository
this.workflowExecutionService_ = workflowExecutionService
this.workflowOrchestratorService_ = workflowOrchestratorService
this.redisDisconnectHandler_ = redisDisconnectHandler
}
__joinerConfig(): ModuleJoinerConfig {
return joinerConfig
}
__hooks = {
onApplicationShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationShutdown()
await this.redisDisconnectHandler_()
},
onApplicationPrepareShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationPrepareShutdown()
},
}
@InjectManager("baseRepository_")
async retrieveWorkflowExecution(
idOrObject:
| string
| {
workflow_id: string
transaction_id: string
},
config: FindConfig<WorkflowOrchestratorTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<WorkflowOrchestratorTypes.WorkflowExecutionDTO> {
const objValue = isString(idOrObject)
? { id: idOrObject }
: {
workflow_id: idOrObject.workflow_id,
transaction_id: idOrObject.transaction_id,
}
const wfExecution = await this.workflowExecutionService_.list(
objValue,
config,
sharedContext
)
if (wfExecution.length === 0) {
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`WorkflowExecution with ${Object.keys(objValue).join(
", "
)}: ${Object.values(objValue).join(", ")} was not found`
)
}
// eslint-disable-next-line max-len
return await this.baseRepository_.serialize<WorkflowOrchestratorTypes.WorkflowExecutionDTO>(
wfExecution[0],
{
populate: true,
}
)
}
@InjectManager("baseRepository_")
async listWorkflowExecution(
filters: WorkflowOrchestratorTypes.FilterableWorkflowExecutionProps = {},
config: FindConfig<WorkflowOrchestratorTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<WorkflowOrchestratorTypes.WorkflowExecutionDTO[]> {
if (filters.transaction_id) {
if (Array.isArray(filters.transaction_id)) {
filters.transaction_id = {
$in: filters.transaction_id,
}
}
}
if (filters.workflow_id) {
if (Array.isArray(filters.workflow_id)) {
filters.workflow_id = {
$in: filters.workflow_id,
}
}
}
const wfExecutions = await this.workflowExecutionService_.list(
filters,
config,
sharedContext
)
return await this.baseRepository_.serialize<
WorkflowOrchestratorTypes.WorkflowExecutionDTO[]
>(wfExecutions, {
populate: true,
})
}
@InjectManager("baseRepository_")
async listAndCountWorkflowExecution(
filters: WorkflowOrchestratorTypes.FilterableWorkflowExecutionProps = {},
config: FindConfig<WorkflowOrchestratorTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
): Promise<[WorkflowOrchestratorTypes.WorkflowExecutionDTO[], number]> {
if (filters.transaction_id) {
if (Array.isArray(filters.transaction_id)) {
filters.transaction_id = {
$in: filters.transaction_id,
}
}
}
if (filters.workflow_id) {
if (Array.isArray(filters.workflow_id)) {
filters.workflow_id = {
$in: filters.workflow_id,
}
}
}
const [wfExecutions, count] =
await this.workflowExecutionService_.listAndCount(
filters,
config,
sharedContext
)
return [
await this.baseRepository_.serialize<
WorkflowOrchestratorTypes.WorkflowExecutionDTO[]
>(wfExecutions, {
populate: true,
}),
count,
]
}
@InjectSharedContext()
async run<TWorkflow extends string | ReturnWorkflow<any, any, any>>(
workflowIdOrWorkflow: TWorkflow,
options: WorkflowOrchestratorTypes.WorkflowOrchestratorRunDTO<
TWorkflow extends ReturnWorkflow<any, any, any>
? UnwrapWorkflowInputDataType<TWorkflow>
: unknown
> = {},
@MedusaContext() context: Context = {}
) {
const ret = await this.workflowOrchestratorService_.run<
TWorkflow extends ReturnWorkflow<any, any, any>
? UnwrapWorkflowInputDataType<TWorkflow>
: unknown
>(workflowIdOrWorkflow, options, context)
return ret as any
}
@InjectSharedContext()
async getRunningTransaction(
workflowId: string,
transactionId: string,
@MedusaContext() context: Context = {}
) {
return await this.workflowOrchestratorService_.getRunningTransaction(
workflowId,
transactionId,
context
)
}
@InjectSharedContext()
async setStepSuccess(
{
idempotencyKey,
stepResponse,
options,
}: {
idempotencyKey: string | object
stepResponse: unknown
options?: Record<string, any>
},
@MedusaContext() context: Context = {}
) {
return await this.workflowOrchestratorService_.setStepSuccess(
{
idempotencyKey,
stepResponse,
options,
} as any,
context
)
}
@InjectSharedContext()
async setStepFailure(
{
idempotencyKey,
stepResponse,
options,
}: {
idempotencyKey: string | object
stepResponse: unknown
options?: Record<string, any>
},
@MedusaContext() context: Context = {}
) {
return await this.workflowOrchestratorService_.setStepFailure(
{
idempotencyKey,
stepResponse,
options,
} as any,
context
)
}
@InjectSharedContext()
async subscribe(
args: {
workflowId: string
transactionId?: string
subscriber: Function
subscriberId?: string
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.subscribe(args as any, context)
}
@InjectSharedContext()
async unsubscribe(
args: {
workflowId: string
transactionId?: string
subscriberOrId: string | Function
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.unsubscribe(args as any, context)
}
}

View File

@@ -0,0 +1,34 @@
import { Logger } from "@medusajs/types"
import { RedisOptions } from "ioredis"
export type InitializeModuleInjectableDependencies = {
logger?: Logger
}
/**
* Module config type
*/
export type RedisWorkflowsOptions = {
/**
* Redis connection string
*/
url?: string
/**
* Queue name used for retries and timeouts
*/
queueName?: string
/**
* Redis client options
*/
options?: RedisOptions
/**
* Optiona connection string and options to pub/sub
*/
pubsub?: {
url: string
options?: RedisOptions
}
}

View File

@@ -0,0 +1 @@
export * from "./workflow-orchestrator-storage"

View File

@@ -0,0 +1,293 @@
import {
DistributedTransaction,
DistributedTransactionStorage,
TransactionCheckpoint,
TransactionStep,
} from "@medusajs/orchestration"
import { ModulesSdkTypes } from "@medusajs/types"
import { TransactionState } from "@medusajs/utils"
import { WorkflowOrchestratorService } from "@services"
import { Queue, Worker } from "bullmq"
import Redis from "ioredis"
enum JobType {
RETRY = "retry",
STEP_TIMEOUT = "step_timeout",
TRANSACTION_TIMEOUT = "transaction_timeout",
}
// eslint-disable-next-line max-len
export class RedisDistributedTransactionStorage extends DistributedTransactionStorage {
private static TTL_AFTER_COMPLETED = 60 * 15 // 15 minutes
private workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
private workflowOrchestratorService_: WorkflowOrchestratorService
private redisClient: Redis
private queue: Queue
private worker: Worker
constructor({
workflowExecutionService,
redisConnection,
redisWorkerConnection,
redisQueueName,
}: {
workflowExecutionService: ModulesSdkTypes.InternalModuleService<any>
redisConnection: Redis
redisWorkerConnection: Redis
redisQueueName: string
}) {
super()
this.workflowExecutionService_ = workflowExecutionService
this.redisClient = redisConnection
this.queue = new Queue(redisQueueName, { connection: this.redisClient })
this.worker = new Worker(
redisQueueName,
async (job) => {
const allJobs = [
JobType.RETRY,
JobType.STEP_TIMEOUT,
JobType.TRANSACTION_TIMEOUT,
]
if (allJobs.includes(job.name as JobType)) {
await this.executeTransaction(
job.data.workflowId,
job.data.transactionId
)
}
},
{ connection: redisWorkerConnection }
)
}
async onApplicationPrepareShutdown() {
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker.close()
}
async onApplicationShutdown() {
await this.queue.close()
}
setWorkflowOrchestratorService(workflowOrchestratorService) {
this.workflowOrchestratorService_ = workflowOrchestratorService
}
private async saveToDb(data: TransactionCheckpoint) {
await this.workflowExecutionService_.upsert([
{
workflow_id: data.flow.modelId,
transaction_id: data.flow.transactionId,
execution: data.flow,
context: {
data: data.context,
errors: data.errors,
},
state: data.flow.state,
},
])
}
private async deleteFromDb(data: TransactionCheckpoint) {
await this.workflowExecutionService_.delete([
{
workflow_id: data.flow.modelId,
transaction_id: data.flow.transactionId,
},
])
}
private async executeTransaction(workflowId: string, transactionId: string) {
return await this.workflowOrchestratorService_.run(workflowId, {
transactionId,
throwOnError: false,
})
}
async get(key: string): Promise<TransactionCheckpoint | undefined> {
const data = await this.redisClient.get(key)
return data ? JSON.parse(data) : undefined
}
async list(): Promise<TransactionCheckpoint[]> {
const keys = await this.redisClient.keys(
DistributedTransaction.keyPrefix + ":*"
)
const transactions: any[] = []
for (const key of keys) {
const data = await this.redisClient.get(key)
if (data) {
transactions.push(JSON.parse(data))
}
}
return transactions
}
async save(
key: string,
data: TransactionCheckpoint,
ttl?: number
): Promise<void> {
let retentionTime
/**
* Store the retention time only if the transaction is done, failed or reverted.
* From that moment, this tuple can be later on archived or deleted after the retention time.
*/
const hasFinished = [
TransactionState.DONE,
TransactionState.FAILED,
TransactionState.REVERTED,
].includes(data.flow.state)
if (hasFinished) {
retentionTime = data.flow.options?.retentionTime
Object.assign(data, {
retention_time: retentionTime,
})
}
const stringifiedData = JSON.stringify(data)
const parsedData = JSON.parse(stringifiedData)
if (!hasFinished) {
if (ttl) {
await this.redisClient.set(key, stringifiedData, "EX", ttl)
} else {
await this.redisClient.set(key, stringifiedData)
}
}
if (hasFinished && !retentionTime) {
await this.deleteFromDb(parsedData)
} else {
await this.saveToDb(parsedData)
}
if (hasFinished) {
// await this.redisClient.del(key)
await this.redisClient.set(
key,
stringifiedData,
"EX",
RedisDistributedTransactionStorage.TTL_AFTER_COMPLETED
)
}
}
async scheduleRetry(
transaction: DistributedTransaction,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void> {
await this.queue.add(
JobType.RETRY,
{
workflowId: transaction.modelId,
transactionId: transaction.transactionId,
stepId: step.id,
},
{
delay: interval > 0 ? interval * 1000 : undefined,
jobId: this.getJobId(JobType.RETRY, transaction, step),
removeOnComplete: true,
}
)
}
async clearRetry(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void> {
await this.removeJob(JobType.RETRY, transaction, step)
}
async scheduleTransactionTimeout(
transaction: DistributedTransaction,
timestamp: number,
interval: number
): Promise<void> {
await this.queue.add(
JobType.TRANSACTION_TIMEOUT,
{
workflowId: transaction.modelId,
transactionId: transaction.transactionId,
},
{
delay: interval * 1000,
jobId: this.getJobId(JobType.TRANSACTION_TIMEOUT, transaction),
removeOnComplete: true,
}
)
}
async clearTransactionTimeout(
transaction: DistributedTransaction
): Promise<void> {
await this.removeJob(JobType.TRANSACTION_TIMEOUT, transaction)
}
async scheduleStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep,
timestamp: number,
interval: number
): Promise<void> {
await this.queue.add(
JobType.STEP_TIMEOUT,
{
workflowId: transaction.modelId,
transactionId: transaction.transactionId,
stepId: step.id,
},
{
delay: interval * 1000,
jobId: this.getJobId(JobType.STEP_TIMEOUT, transaction, step),
removeOnComplete: true,
}
)
}
async clearStepTimeout(
transaction: DistributedTransaction,
step: TransactionStep
): Promise<void> {
await this.removeJob(JobType.STEP_TIMEOUT, transaction, step)
}
private getJobId(
type: JobType,
transaction: DistributedTransaction,
step?: TransactionStep
) {
const key = [type, transaction.modelId, transaction.transactionId]
if (step) {
key.push(step.id, step.attempts + "")
if (step.isCompensating()) {
key.push("compensate")
}
}
return key.join(":")
}
private async removeJob(
type: JobType,
transaction: DistributedTransaction,
step?: TransactionStep
) {
const jobId = this.getJobId(type, transaction, step)
const job = await this.queue.getJob(jobId)
if (job && job.attemptsStarted === 0) {
await job.remove()
}
}
}