diff --git a/.changeset/wild-eagles-count.md b/.changeset/wild-eagles-count.md new file mode 100644 index 0000000000..a012c4f9b2 --- /dev/null +++ b/.changeset/wild-eagles-count.md @@ -0,0 +1,9 @@ +--- +"@medusajs/orchestration": minor +"@medusajs/modules-sdk": minor +"@medusajs/workflows": minor +"@medusajs/medusa": minor +"@medusajs/types": minor +--- + +Medusa workflows package diff --git a/integration-tests/api/__tests__/store/__snapshots__/product-variants.js.snap b/integration-tests/api/__tests__/store/__snapshots__/product-variants.js.snap index 02a1a260e2..dcf425cb92 100644 --- a/integration-tests/api/__tests__/store/__snapshots__/product-variants.js.snap +++ b/integration-tests/api/__tests__/store/__snapshots__/product-variants.js.snap @@ -95,7 +95,7 @@ Object { "metadata": null, "option_id": "test-option", "updated_at": Any, - "value": "Default variant", + "value": Any, "variant_id": "test-variant", }, ], @@ -187,7 +187,7 @@ Object { "metadata": null, "option_id": Any, "updated_at": Any, - "value": "Practical", + "value": Any, "variant_id": Any, }, ], diff --git a/integration-tests/api/__tests__/store/product-variants.js b/integration-tests/api/__tests__/store/product-variants.js index 4d49e4b96d..01b10d9663 100644 --- a/integration-tests/api/__tests__/store/product-variants.js +++ b/integration-tests/api/__tests__/store/product-variants.js @@ -225,6 +225,7 @@ describe("/store/variants", () => { { created_at: expect.any(String), updated_at: expect.any(String), + value: expect.any(String), }, ], prices: [ @@ -361,6 +362,7 @@ describe("/store/variants", () => { id: expect.any(String), option_id: expect.any(String), variant_id: expect.any(String), + value: expect.any(String), }, ], prices: [ diff --git a/integration-tests/factories/simple-product-factory.ts b/integration-tests/factories/simple-product-factory.ts index a5a8ce112b..703a66e491 100644 --- a/integration-tests/factories/simple-product-factory.ts +++ b/integration-tests/factories/simple-product-factory.ts @@ -7,8 +7,6 @@ import { ShippingProfileType, Store, } from "@medusajs/medusa" -import faker from "faker" -import { DataSource } from "typeorm" import { ProductVariantFactoryData, simpleProductVariantFactory, @@ -18,6 +16,9 @@ import { simpleSalesChannelFactory, } from "./simple-sales-channel-factory" +import { DataSource } from "typeorm" +import faker from "faker" + export type ProductFactoryData = { id?: string is_giftcard?: boolean diff --git a/integration-tests/plugins/__tests__/medusa-plugin-sendgrid/__snapshots__/index.js.snap b/integration-tests/plugins/__tests__/medusa-plugin-sendgrid/__snapshots__/index.js.snap index 1330dbea87..9a07da02fb 100644 --- a/integration-tests/plugins/__tests__/medusa-plugin-sendgrid/__snapshots__/index.js.snap +++ b/integration-tests/plugins/__tests__/medusa-plugin-sendgrid/__snapshots__/index.js.snap @@ -2533,4 +2533,4 @@ Object { "tracking_links": Array [], "tracking_number": "", } -`; +`; \ No newline at end of file diff --git a/packages/medusa/package.json b/packages/medusa/package.json index 69bd7608a4..f7e225fb5d 100644 --- a/packages/medusa/package.json +++ b/packages/medusa/package.json @@ -48,6 +48,7 @@ "dependencies": { "@medusajs/medusa-cli": "^1.3.17", "@medusajs/modules-sdk": "^1.8.8", + "@medusajs/orchestration": "^0.0.2", "@medusajs/utils": "^1.9.2", "awilix": "^8.0.0", "body-parser": "^1.19.0", diff --git a/packages/medusa/src/api/routes/admin/inventory-items/transaction/create-inventory-item.ts b/packages/medusa/src/api/routes/admin/inventory-items/transaction/create-inventory-item.ts index 92b02d1ce6..7d21ec4cfd 100644 --- a/packages/medusa/src/api/routes/admin/inventory-items/transaction/create-inventory-item.ts +++ b/packages/medusa/src/api/routes/admin/inventory-items/transaction/create-inventory-item.ts @@ -5,7 +5,7 @@ import { TransactionPayload, TransactionState, TransactionStepsDefinition, -} from "../../../../../utils/transaction" +} from "@medusajs/orchestration" import { IInventoryService, InventoryItemDTO } from "@medusajs/types" import { ProductVariantInventoryService, diff --git a/packages/medusa/src/api/routes/admin/products/create-product.ts b/packages/medusa/src/api/routes/admin/products/create-product.ts index fb7f23678f..73c8209db3 100644 --- a/packages/medusa/src/api/routes/admin/products/create-product.ts +++ b/packages/medusa/src/api/routes/admin/products/create-product.ts @@ -1,3 +1,7 @@ +import { + CreateProductVariantInput, + ProductVariantPricesCreateReq, +} from "../../../../types/product-variant" import { IsArray, IsBoolean, @@ -8,7 +12,6 @@ import { IsString, ValidateNested, } from "class-validator" -import { defaultAdminProductFields, defaultAdminProductRelations } from "." import { PricingService, ProductService, @@ -23,26 +26,23 @@ import { ProductTagReq, ProductTypeReq, } from "../../../../types/product" -import { - CreateProductVariantInput, - ProductVariantPricesCreateReq, -} from "../../../../types/product-variant" - -import { IInventoryService } from "@medusajs/types" -import { Type } from "class-transformer" -import { EntityManager } from "typeorm" -import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-channels" -import { ProductStatus } from "../../../../models" -import { Logger } from "../../../../types/global" -import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators" -import { FlagRouter } from "../../../../utils/flag-router" -import { DistributedTransaction } from "../../../../utils/transaction" -import { validator } from "../../../../utils/validator" import { createVariantsTransaction, revertVariantTransaction, } from "./transaction/create-product-variant" +import { defaultAdminProductFields, defaultAdminProductRelations } from "." + +import { DistributedTransaction } from "@medusajs/orchestration" +import { EntityManager } from "typeorm" +import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators" +import { FlagRouter } from "../../../../utils/flag-router" +import { IInventoryService } from "@medusajs/types" +import { Logger } from "../../../../types/global" +import { ProductStatus } from "../../../../models" +import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-channels" +import { Type } from "class-transformer" import { createProductsWorkflow } from "../../../../workflows/admin/create-products" +import { validator } from "../../../../utils/validator" /** * @oas [post] /admin/products diff --git a/packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts b/packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts index c1e1d62396..3762329364 100644 --- a/packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts +++ b/packages/medusa/src/api/routes/admin/products/transaction/create-product-variant.ts @@ -5,7 +5,7 @@ import { TransactionPayload, TransactionState, TransactionStepsDefinition, -} from "../../../../../utils/transaction" +} from "@medusajs/orchestration" import { IInventoryService, InventoryItemDTO } from "@medusajs/types" import { ProductVariantInventoryService, diff --git a/packages/medusa/src/api/routes/admin/products/update-product.ts b/packages/medusa/src/api/routes/admin/products/update-product.ts index 00b88d1315..0f53a62fea 100644 --- a/packages/medusa/src/api/routes/admin/products/update-product.ts +++ b/packages/medusa/src/api/routes/admin/products/update-product.ts @@ -1,6 +1,8 @@ -import { IInventoryService } from "@medusajs/types" -import { MedusaError } from "@medusajs/utils" -import { Type } from "class-transformer" +import { + CreateProductVariantInput, + ProductVariantPricesUpdateReq, + UpdateProductVariantInput, +} from "../../../../types/product-variant" import { IsArray, IsBoolean, @@ -14,36 +16,35 @@ import { ValidateIf, ValidateNested, } from "class-validator" -import { EntityManager } from "typeorm" -import { defaultAdminProductFields, defaultAdminProductRelations } from "." -import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-channels" -import { ProductStatus, ProductVariant } from "../../../../models" -import { ProductVariantRepository } from "../../../../repositories/product-variant" import { PricingService, ProductService, ProductVariantInventoryService, ProductVariantService, } from "../../../../services" -import { Logger } from "../../../../types/global" import { ProductProductCategoryReq, ProductSalesChannelReq, ProductTagReq, ProductTypeReq, } from "../../../../types/product" -import { - CreateProductVariantInput, - ProductVariantPricesUpdateReq, - UpdateProductVariantInput, -} from "../../../../types/product-variant" -import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators" -import { DistributedTransaction } from "../../../../utils/transaction" -import { validator } from "../../../../utils/validator" +import { ProductStatus, ProductVariant } from "../../../../models" import { createVariantsTransaction, revertVariantTransaction, } from "./transaction/create-product-variant" +import { defaultAdminProductFields, defaultAdminProductRelations } from "." + +import { DistributedTransaction } from "@medusajs/orchestration" +import { EntityManager } from "typeorm" +import { FeatureFlagDecorators } from "../../../../utils/feature-flag-decorators" +import { IInventoryService } from "@medusajs/types" +import { Logger } from "../../../../types/global" +import { MedusaError } from "@medusajs/utils" +import { ProductVariantRepository } from "../../../../repositories/product-variant" +import SalesChannelFeatureFlag from "../../../../loaders/feature-flags/sales-channels" +import { Type } from "class-transformer" +import { validator } from "../../../../utils/validator" /** * @oas [post] /admin/products/{id} diff --git a/packages/medusa/src/utils/transaction/workflow-manager.ts b/packages/medusa/src/utils/transaction/workflow-manager.ts deleted file mode 100644 index 18f1a0dcde..0000000000 --- a/packages/medusa/src/utils/transaction/workflow-manager.ts +++ /dev/null @@ -1,226 +0,0 @@ -import { MedusaContainer } from "@medusajs/types" -import { - DistributedTransaction, - TransactionMetadata, -} from "./distributed-transaction" -import { TransactionOrchestrator } from "./transaction-orchestrator" -import { TransactionStepHandler } from "./transaction-step" -import { TransactionHandlerType, TransactionStepsDefinition } from "./types" -import { OrchestratorBuilder } from "./orchestrator-builder" - -interface Workflow { - id: string - handler: (container: MedusaContainer) => TransactionStepHandler - orchestrator: TransactionOrchestrator - flow_: TransactionStepsDefinition - handlers_: Map< - string, - { invoke: InvokeHandler; compensate?: CompensateHandler } - > - requiredModules?: Set - optionalModules?: Set -} - -type InvokeHandler = ( - container: MedusaContainer, - payload: any, - invoke: { [actions: string]: any }, - metadata: TransactionMetadata -) => Promise - -type CompensateHandler = ( - container: MedusaContainer, - payload: any, - invoke: { [actions: string]: any }, - compensate: { [actions: string]: any }, - metadata: TransactionMetadata -) => Promise - -export class WorkflowManager { - protected static workflows: Map = new Map() - protected container: MedusaContainer - - constructor(container?: MedusaContainer) { - this.container = container as MedusaContainer - } - - static unregister(workflowId: string) { - WorkflowManager.workflows.delete(workflowId) - } - - static unregisterAll() { - WorkflowManager.workflows.clear() - } - - static getWorkflows() { - return WorkflowManager.workflows - } - - static getTransactionDefinition(workflowId): OrchestratorBuilder { - if (!WorkflowManager.workflows.has(workflowId)) { - throw new Error(`Workflow with id "${workflowId}" not found.`) - } - - const workflow = WorkflowManager.workflows.get(workflowId)! - return new OrchestratorBuilder(workflow.flow_) - } - - static register( - workflowId: string, - flow: TransactionStepsDefinition | OrchestratorBuilder, - handlers: Map< - string, - { invoke: InvokeHandler; compensate?: CompensateHandler } - >, - requiredModules?: Set, - optionalModules?: Set - ) { - if (WorkflowManager.workflows.has(workflowId)) { - throw new Error(`Workflow with id "${workflowId}" is already defined.`) - } - - const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow - - WorkflowManager.workflows.set(workflowId, { - id: workflowId, - flow_: finalFlow, - orchestrator: new TransactionOrchestrator(workflowId, finalFlow), - handler: WorkflowManager.buildHandlers(handlers), - handlers_: handlers, - requiredModules, - optionalModules, - }) - } - - static update( - workflowId: string, - flow: TransactionStepsDefinition | OrchestratorBuilder, - handlers: Map< - string, - { invoke: InvokeHandler; compensate?: CompensateHandler } - >, - requiredModules?: Set, - optionalModules?: Set - ) { - if (!WorkflowManager.workflows.has(workflowId)) { - throw new Error(`Workflow with id "${workflowId}" not found.`) - } - - const workflow = WorkflowManager.workflows.get(workflowId)! - - for (const [key, value] of handlers.entries()) { - workflow.handlers_.set(key, value) - } - - const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow - - WorkflowManager.workflows.set(workflowId, { - id: workflowId, - flow_: finalFlow, - orchestrator: new TransactionOrchestrator(workflowId, finalFlow), - handler: WorkflowManager.buildHandlers(workflow.handlers_), - handlers_: workflow.handlers_, - requiredModules, - optionalModules, - }) - } - - private static buildHandlers( - handlers: Map< - string, - { invoke: InvokeHandler; compensate?: CompensateHandler } - > - ): (container: MedusaContainer) => TransactionStepHandler { - return (container: MedusaContainer): TransactionStepHandler => { - return async ( - actionId: string, - handlerType: TransactionHandlerType, - payload?: any - ) => { - const command = handlers.get(actionId) - - if (!command) { - throw new Error(`Handler for action "${actionId}" not found.`) - } else if (!command[handlerType]) { - throw new Error( - `"${handlerType}" handler for action "${actionId}" not found.` - ) - } - - const { invoke, compensate, payload: input } = payload.context - const { metadata } = payload - - if (handlerType === TransactionHandlerType.COMPENSATE) { - return await command[handlerType]!( - container, - input, - invoke, - compensate, - metadata - ) - } - - return await command[handlerType](container, input, invoke, metadata) - } - } - } - - async begin( - workflowId: string, - uniqueTransactionId: string, - input?: unknown - ) { - if (!WorkflowManager.workflows.has(workflowId)) { - throw new Error(`Workflow with id "${workflowId}" not found.`) - } - - const workflow = WorkflowManager.workflows.get(workflowId)! - - const orchestrator = workflow.orchestrator - - const transaction = await orchestrator.beginTransaction( - uniqueTransactionId, - workflow.handler(this.container), - input - ) - - await orchestrator.resume(transaction) - - return transaction - } - - async registerStepSuccess( - workflowId: string, - idempotencyKey: string, - response?: unknown - ): Promise { - if (!WorkflowManager.workflows.has(workflowId)) { - throw new Error(`Workflow with id "${workflowId}" not found.`) - } - - const workflow = WorkflowManager.workflows.get(workflowId)! - return await workflow.orchestrator.registerStepSuccess( - idempotencyKey, - workflow.handler(this.container), - undefined, - response - ) - } - - async registerStepFailure( - workflowId: string, - idempotencyKey: string, - error?: Error | any - ): Promise { - if (!WorkflowManager.workflows.has(workflowId)) { - throw new Error(`Workflow with id "${workflowId}" not found.`) - } - - const workflow = WorkflowManager.workflows.get(workflowId)! - return await workflow.orchestrator.registerStepFailure( - idempotencyKey, - error, - workflow.handler(this.container) - ) - } -} diff --git a/packages/medusa/src/workflows/admin/create-products/definition.ts b/packages/medusa/src/workflows/admin/create-products/definition.ts index dfc02bea55..4003cecfb8 100644 --- a/packages/medusa/src/workflows/admin/create-products/definition.ts +++ b/packages/medusa/src/workflows/admin/create-products/definition.ts @@ -3,7 +3,7 @@ import { TransactionPayload, TransactionStepHandler, TransactionStepsDefinition, -} from "../../../utils/transaction" +} from "@medusajs/orchestration" import { IInventoryService, MedusaContainer, diff --git a/packages/medusa/src/workflows/admin/create-products/index.ts b/packages/medusa/src/workflows/admin/create-products/index.ts index b6c7d5eb32..485ab5a4d9 100644 --- a/packages/medusa/src/workflows/admin/create-products/index.ts +++ b/packages/medusa/src/workflows/admin/create-products/index.ts @@ -3,7 +3,7 @@ import { MedusaError } from "@medusajs/utils" import { TransactionOrchestrator, TransactionState, -} from "../../../utils/transaction" +} from "@medusajs/orchestration" import { AdminPostProductsReq } from "../../../api" import { Product } from "../../../models" import { PricedProduct } from "../../../types/pricing" diff --git a/packages/modules-sdk/src/medusa-module.ts b/packages/modules-sdk/src/medusa-module.ts index be20290011..8d87599a02 100644 --- a/packages/modules-sdk/src/medusa-module.ts +++ b/packages/modules-sdk/src/medusa-module.ts @@ -1,10 +1,9 @@ import { ExternalModuleDeclaration, InternalModuleDeclaration, - JoinerServiceConfig, + LoadedModule, MODULE_RESOURCE_TYPE, MODULE_SCOPE, - ModuleDefinition, ModuleExports, ModuleResolution, } from "@medusajs/types" @@ -27,7 +26,10 @@ const logger: any = { declare global { interface MedusaModule { - getLoadedModules(): Map + getLoadedModules( + aliases?: Map + ): { [key: string]: LoadedModule }[] + getModuleInstance(moduleKey: string, alias?: string): LoadedModule } } @@ -43,14 +45,16 @@ export class MedusaModule { private static modules_: Map = new Map() private static loading_: Map> = new Map() - public static getLoadedModules(): Map< - string, - any & { - __joinerConfig: JoinerServiceConfig - __definition: ModuleDefinition - } - > { - return MedusaModule.instances_ + public static getLoadedModules( + aliases?: Map + ): { [key: string]: LoadedModule }[] { + return [...MedusaModule.modules_.entries()].map(([key]) => { + if (aliases?.has(key)) { + return MedusaModule.getModuleInstance(key, aliases.get(key)) + } + + return MedusaModule.getModuleInstance(key) + }) } public static clearInstances(): void { @@ -267,3 +271,6 @@ export class MedusaModule { } } } + +global.MedusaModule ??= MedusaModule +exports.MedusaModule = global.MedusaModule diff --git a/packages/modules-sdk/src/remote-query.ts b/packages/modules-sdk/src/remote-query.ts index 7df0828f7f..9dd2d9d48e 100644 --- a/packages/modules-sdk/src/remote-query.ts +++ b/packages/modules-sdk/src/remote-query.ts @@ -1,7 +1,7 @@ import { JoinerRelationship, JoinerServiceConfig, - ModuleDefinition, + LoadedModule, RemoteExpandProperty, } from "@medusajs/types" @@ -11,13 +11,10 @@ import { toPascalCase } from "@medusajs/utils" export class RemoteQuery { private remoteJoiner: RemoteJoiner - private modulesMap: Map = new Map() + private modulesMap: Map = new Map() constructor( - modulesLoaded?: (any & { - __joinerConfig: JoinerServiceConfig - __definition: ModuleDefinition - })[], + modulesLoaded?: LoadedModule[], remoteFetchData?: ( expand: RemoteExpandProperty, keyField: string, @@ -29,8 +26,8 @@ export class RemoteQuery { }> ) { if (!modulesLoaded?.length) { - modulesLoaded = [...MedusaModule.getLoadedModules().entries()].map( - ([, mod]) => mod + modulesLoaded = MedusaModule.getLoadedModules().map( + (mod) => Object.values(mod)[0] ) } @@ -130,7 +127,7 @@ export class RemoteQuery { path?: string }> { const serviceConfig = expand.serviceConfig - const service = this.modulesMap.get(serviceConfig.serviceName) + const service = this.modulesMap.get(serviceConfig.serviceName)! let filters = {} const options = { diff --git a/packages/medusa/src/utils/__tests__/transaction/orchestrator-builder.ts b/packages/orchestration/src/__tests__/transaction/orchestrator-builder.ts similarity index 100% rename from packages/medusa/src/utils/__tests__/transaction/orchestrator-builder.ts rename to packages/orchestration/src/__tests__/transaction/orchestrator-builder.ts diff --git a/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts b/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts similarity index 99% rename from packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts rename to packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index f08b18befa..6fe90e082c 100644 --- a/packages/medusa/src/utils/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -1,9 +1,9 @@ import { - TransactionOrchestrator, - TransactionStepsDefinition, TransactionHandlerType, + TransactionOrchestrator, TransactionPayload, TransactionState, + TransactionStepsDefinition, } from "../../transaction" describe("Transaction Orchestrator", () => { @@ -96,7 +96,7 @@ describe("Transaction Orchestrator", () => { ) }) - it("Should run steps in parallel if 'next' is an array", async () => { + it("Should resume steps in parallel if 'next' is an array", async () => { const actionOrder: string[] = [] async function handler( actionId: string, diff --git a/packages/medusa/src/utils/__tests__/transaction/workflow-manager.ts b/packages/orchestration/src/__tests__/workflow/global-workflow.ts similarity index 87% rename from packages/medusa/src/utils/__tests__/transaction/workflow-manager.ts rename to packages/orchestration/src/__tests__/workflow/global-workflow.ts index e0d809cf7f..0669aea5dd 100644 --- a/packages/medusa/src/utils/__tests__/transaction/workflow-manager.ts +++ b/packages/orchestration/src/__tests__/workflow/global-workflow.ts @@ -1,11 +1,12 @@ -import { WorkflowManager } from "../../transaction/workflow-manager" +import { GlobalWorkflow } from "../../workflow/global-workflow" import { TransactionState } from "../../transaction/types" +import { WorkflowManager } from "../../workflow/workflow-manager" describe("WorkflowManager", () => { const container: any = {} let handlers - let flow: WorkflowManager + let flow: GlobalWorkflow let asyncStepIdempotencyKey: string beforeEach(() => { @@ -31,7 +32,7 @@ describe("WorkflowManager", () => { }) handlers.set("callExternal", { - invoke: jest.fn((container, payload, invoke, metadata) => { + invoke: jest.fn(({ metadata }) => { asyncStepIdempotencyKey = metadata.idempotency_key }), }) @@ -74,7 +75,7 @@ describe("WorkflowManager", () => { handlers ) - flow = new WorkflowManager(container) + flow = new GlobalWorkflow(container) }) it("should return all registered workflows", () => { @@ -83,7 +84,7 @@ describe("WorkflowManager", () => { }) it("should begin a transaction and returns its final state", async () => { - const transaction = await flow.begin("create-product", "t-id", { + const transaction = await flow.run("create-product", "t-id", { input: 123, }) @@ -97,7 +98,7 @@ describe("WorkflowManager", () => { }) it("should begin a transaction and revert it when fail", async () => { - const transaction = await flow.begin("broken-delivery", "t-id") + const transaction = await flow.run("broken-delivery", "t-id") expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("broken").invoke).toHaveBeenCalledTimes(1) @@ -109,7 +110,7 @@ describe("WorkflowManager", () => { }) it("should continue an asyncronous transaction after reporting a successful step", async () => { - const transaction = await flow.begin("deliver-product", "t-id") + const transaction = await flow.run("deliver-product", "t-id") expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) @@ -128,7 +129,7 @@ describe("WorkflowManager", () => { }) it("should revert an asyncronous transaction after reporting a failure step", async () => { - const transaction = await flow.begin("deliver-product", "t-id") + const transaction = await flow.run("deliver-product", "t-id") expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) @@ -150,7 +151,7 @@ describe("WorkflowManager", () => { expect(continuation.getState()).toBe(TransactionState.FAILED) }) - it("should update an existing flow with a new step and a new handler", async () => { + it("should update an existing global flow with a new step and a new handler", async () => { const definition = WorkflowManager.getTransactionDefinition("create-product") @@ -164,8 +165,7 @@ describe("WorkflowManager", () => { WorkflowManager.update("create-product", definition, additionalHandlers) - const transaction = await flow.begin("create-product", "t-id") - console.log(transaction) + const transaction = await flow.run("create-product", "t-id") expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1) diff --git a/packages/orchestration/src/__tests__/workflow/local-workflow.ts b/packages/orchestration/src/__tests__/workflow/local-workflow.ts new file mode 100644 index 0000000000..9d8209a36d --- /dev/null +++ b/packages/orchestration/src/__tests__/workflow/local-workflow.ts @@ -0,0 +1,175 @@ +import { LocalWorkflow } from "../../workflow/local-workflow" +import { TransactionState } from "../../transaction/types" +import { WorkflowManager } from "../../workflow/workflow-manager" + +describe("WorkflowManager", () => { + const container: any = {} + + let handlers + let asyncStepIdempotencyKey: string + + beforeEach(() => { + jest.resetAllMocks() + WorkflowManager.unregisterAll() + + handlers = new Map() + handlers.set("foo", { + invoke: jest.fn().mockResolvedValue({ done: true }), + compensate: jest.fn(() => {}), + }) + + handlers.set("bar", { + invoke: jest.fn().mockResolvedValue({ done: true }), + compensate: jest.fn().mockResolvedValue({}), + }) + + handlers.set("broken", { + invoke: jest.fn(() => { + throw new Error("Step Failed") + }), + compensate: jest.fn().mockResolvedValue({ bar: 123, reverted: true }), + }) + + handlers.set("callExternal", { + invoke: jest.fn(({ metadata }) => { + asyncStepIdempotencyKey = metadata.idempotency_key + }), + }) + + WorkflowManager.register( + "create-product", + { + action: "foo", + next: { + action: "bar", + }, + }, + handlers + ) + + WorkflowManager.register( + "broken-delivery", + { + action: "foo", + next: { + action: "broken", + }, + }, + handlers + ) + + WorkflowManager.register( + "deliver-product", + { + action: "foo", + next: { + action: "callExternal", + async: true, + noCompensation: true, + next: { + action: "bar", + }, + }, + }, + handlers + ) + }) + + it("should return all registered workflows", () => { + const wf = Object.keys(Object.fromEntries(WorkflowManager.getWorkflows())) + expect(wf).toEqual(["create-product", "broken-delivery", "deliver-product"]) + }) + + it("should begin a transaction and returns its final state", async () => { + const flow = new LocalWorkflow("create-product", container) + const transaction = await flow.run("t-id", { + input: 123, + }) + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1) + + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(0) + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(0) + + expect(transaction.getState()).toBe(TransactionState.DONE) + }) + + it("should begin a transaction and revert it when fail", async () => { + const flow = new LocalWorkflow("broken-delivery", container) + const transaction = await flow.run("t-id") + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("broken").invoke).toHaveBeenCalledTimes(1) + + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(1) + expect(handlers.get("broken").compensate).toHaveBeenCalledTimes(1) + + expect(transaction.getState()).toBe(TransactionState.REVERTED) + }) + + it("should continue an asyncronous transaction after reporting a successful step", async () => { + const flow = new LocalWorkflow("deliver-product", container) + const transaction = await flow.run("t-id") + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) + + expect(transaction.getState()).toBe(TransactionState.INVOKING) + + const continuation = await flow.registerStepSuccess( + asyncStepIdempotencyKey, + { ok: true } + ) + + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1) + expect(continuation.getState()).toBe(TransactionState.DONE) + }) + + it("should revert an asyncronous transaction after reporting a failure step", async () => { + const flow = new LocalWorkflow("deliver-product", container) + const transaction = await flow.run("t-id") + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("callExternal").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) + + expect(transaction.getState()).toBe(TransactionState.INVOKING) + + const continuation = await flow.registerStepFailure( + asyncStepIdempotencyKey, + { ok: true } + ) + + expect(handlers.get("foo").compensate).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(0) + expect(handlers.get("bar").compensate).toHaveBeenCalledTimes(0) + + // Failed because the async is flagged as noCompensation + expect(continuation.getState()).toBe(TransactionState.FAILED) + }) + + it("should update a flow with a new step and a new handler", async () => { + const flow = new LocalWorkflow("create-product", container) + + const additionalHandler = { + invoke: jest.fn().mockResolvedValue({ done: true }), + compensate: jest.fn().mockResolvedValue({}), + } + + flow.insertActionBefore("bar", "xor", additionalHandler, { maxRetries: 3 }) + + const transaction = await flow.run("t-id") + + expect(handlers.get("foo").invoke).toHaveBeenCalledTimes(1) + expect(handlers.get("bar").invoke).toHaveBeenCalledTimes(1) + expect(additionalHandler.invoke).toHaveBeenCalledTimes(1) + + expect(transaction.getState()).toBe(TransactionState.DONE) + + expect( + WorkflowManager.getWorkflow("create-product")?.handlers_.has("xor") + ).toEqual(false) + }) +}) diff --git a/packages/orchestration/src/index.ts b/packages/orchestration/src/index.ts index 3a2ffb55e4..01dd889673 100644 --- a/packages/orchestration/src/index.ts +++ b/packages/orchestration/src/index.ts @@ -1 +1,3 @@ export * from "./joiner" +export * from "./transaction" +export * from "./workflow" diff --git a/packages/medusa/src/utils/transaction/distributed-transaction.ts b/packages/orchestration/src/transaction/distributed-transaction.ts similarity index 100% rename from packages/medusa/src/utils/transaction/distributed-transaction.ts rename to packages/orchestration/src/transaction/distributed-transaction.ts diff --git a/packages/medusa/src/utils/transaction/index.ts b/packages/orchestration/src/transaction/index.ts similarity index 78% rename from packages/medusa/src/utils/transaction/index.ts rename to packages/orchestration/src/transaction/index.ts index 7c51c151da..4380d8bfa6 100644 --- a/packages/medusa/src/utils/transaction/index.ts +++ b/packages/orchestration/src/transaction/index.ts @@ -2,3 +2,4 @@ export * from "./types" export * from "./transaction-orchestrator" export * from "./transaction-step" export * from "./distributed-transaction" +export * from "./orchestrator-builder" diff --git a/packages/medusa/src/utils/transaction/orchestrator-builder.ts b/packages/orchestration/src/transaction/orchestrator-builder.ts similarity index 92% rename from packages/medusa/src/utils/transaction/orchestrator-builder.ts rename to packages/orchestration/src/transaction/orchestrator-builder.ts index 6e39adcbd9..711902c5d6 100644 --- a/packages/medusa/src/utils/transaction/orchestrator-builder.ts +++ b/packages/orchestration/src/transaction/orchestrator-builder.ts @@ -1,9 +1,5 @@ import { TransactionStepsDefinition } from "./types" -export type ActionHandler = { - [type: string]: (data: any, context: any) => Promise -} - interface InternalStep extends TransactionStepsDefinition { next?: InternalStep | InternalStep[] depth: number @@ -11,7 +7,12 @@ interface InternalStep extends TransactionStepsDefinition { } export class OrchestratorBuilder { - private steps: InternalStep + protected steps: InternalStep + protected hasChanges_ = false + + get hasChanges() { + return this.hasChanges_ + } constructor(steps?: TransactionStepsDefinition) { this.load(steps) @@ -42,6 +43,7 @@ export class OrchestratorBuilder { } as InternalStep step.next = newAction + this.hasChanges_ = true return this } @@ -56,6 +58,7 @@ export class OrchestratorBuilder { Object.assign(step, options) + this.hasChanges_ = true return this } @@ -92,6 +95,7 @@ export class OrchestratorBuilder { this.updateDepths(oldNext as InternalStep, parentStep) } + this.hasChanges_ = true return this } @@ -112,11 +116,12 @@ export class OrchestratorBuilder { } as InternalStep this.updateDepths(oldNext as InternalStep, step.next) + this.hasChanges_ = true return this } - private appendTo(step: InternalStep | string, newStep: InternalStep) { + protected appendTo(step: InternalStep | string, newStep: InternalStep) { if (typeof step === "string") { step = this.findOrThrowStepByAction(step) } @@ -127,6 +132,7 @@ export class OrchestratorBuilder { parent: step.action, } as InternalStep + this.hasChanges_ = true return this } @@ -146,7 +152,7 @@ export class OrchestratorBuilder { return this } - private move( + protected move( actionToMove: string, targetAction: string, { @@ -226,22 +232,20 @@ export class OrchestratorBuilder { parentTargetActionStep.depth ) + this.hasChanges_ = true + return this } moveAction(actionToMove: string, targetAction: string): OrchestratorBuilder { - this.move(actionToMove, targetAction) - - return this + return this.move(actionToMove, targetAction) } moveAndMergeNextAction( actionToMove: string, targetAction: string ): OrchestratorBuilder { - this.move(actionToMove, targetAction, { mergeNext: true }) - - return this + return this.move(actionToMove, targetAction, { mergeNext: true }) } mergeActions(where: string, ...actions: string[]) { @@ -284,6 +288,8 @@ export class OrchestratorBuilder { parentStep.depth ) + this.hasChanges_ = true + return this } @@ -300,10 +306,11 @@ export class OrchestratorBuilder { delete parentStep.next } + this.hasChanges_ = true return this } - private findStepByAction( + protected findStepByAction( action: string, step: InternalStep = this.steps ): InternalStep | undefined { @@ -325,7 +332,7 @@ export class OrchestratorBuilder { return } - private findOrThrowStepByAction( + protected findOrThrowStepByAction( action: string, steps: InternalStep = this.steps ): InternalStep { @@ -337,7 +344,7 @@ export class OrchestratorBuilder { return step } - private findParentStepByAction( + protected findParentStepByAction( action: string, step: InternalStep = this.steps ): InternalStep | undefined { @@ -365,7 +372,7 @@ export class OrchestratorBuilder { return } - private findLastStep(steps: InternalStep = this.steps): InternalStep { + protected findLastStep(steps: InternalStep = this.steps): InternalStep { let step = steps as InternalStep while (step.next) { step = Array.isArray(step.next) @@ -376,7 +383,7 @@ export class OrchestratorBuilder { return step } - private updateDepths( + protected updateDepths( startingStep: InternalStep, parent, incr = 1, @@ -417,6 +424,8 @@ export class OrchestratorBuilder { return value } ) + + this.hasChanges_ = false return result } } diff --git a/packages/medusa/src/utils/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts similarity index 99% rename from packages/medusa/src/utils/transaction/transaction-orchestrator.ts rename to packages/orchestration/src/transaction/transaction-orchestrator.ts index 63f8b6d35f..7729d5f589 100644 --- a/packages/medusa/src/utils/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -1,18 +1,19 @@ -import { EventEmitter } from "events" -import { - TransactionHandlerType, - TransactionStepsDefinition, - TransactionStepStatus, - TransactionState, - TransactionModel, -} from "./types" import { DistributedTransaction, TransactionCheckpoint, TransactionPayload, } from "./distributed-transaction" +import { + TransactionHandlerType, + TransactionModel, + TransactionState, + TransactionStepStatus, + TransactionStepsDefinition, +} from "./types" import { TransactionStep, TransactionStepHandler } from "./transaction-step" +import { EventEmitter } from "events" + export type TransactionFlow = { modelId: string definition: TransactionStepsDefinition diff --git a/packages/medusa/src/utils/transaction/transaction-step.ts b/packages/orchestration/src/transaction/transaction-step.ts similarity index 100% rename from packages/medusa/src/utils/transaction/transaction-step.ts rename to packages/orchestration/src/transaction/transaction-step.ts diff --git a/packages/medusa/src/utils/transaction/types.ts b/packages/orchestration/src/transaction/types.ts similarity index 100% rename from packages/medusa/src/utils/transaction/types.ts rename to packages/orchestration/src/transaction/types.ts diff --git a/packages/orchestration/src/workflow/global-workflow.ts b/packages/orchestration/src/workflow/global-workflow.ts new file mode 100644 index 0000000000..8545512fc5 --- /dev/null +++ b/packages/orchestration/src/workflow/global-workflow.ts @@ -0,0 +1,94 @@ +import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" +import { WorkflowDefinition, WorkflowManager } from "./workflow-manager" + +import { DistributedTransaction } from "../transaction" +import { asValue } from "awilix" +import { createMedusaContainer } from "@medusajs/utils" + +export class GlobalWorkflow extends WorkflowManager { + protected static workflows: Map = new Map() + protected container: MedusaContainer + protected context: Context + + constructor( + modulesLoaded?: LoadedModule[] | MedusaContainer, + context?: Context + ) { + super() + + const container = createMedusaContainer() + + // Medusa container + if (!Array.isArray(modulesLoaded) && modulesLoaded) { + const cradle = modulesLoaded.cradle + for (const key in cradle) { + container.register(key, asValue(cradle[key])) + } + } + // Array of modules + else if (modulesLoaded?.length) { + for (const mod of modulesLoaded) { + const registrationName = mod.__definition.registrationName + container.register(registrationName, asValue(mod)) + } + } + + this.container = container + this.context = context ?? {} + } + + async run(workflowId: string, uniqueTransactionId: string, input?: unknown) { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + + const orchestrator = workflow.orchestrator + + const transaction = await orchestrator.beginTransaction( + uniqueTransactionId, + workflow.handler(this.container, this.context), + input + ) + + await orchestrator.resume(transaction) + + return transaction + } + + async registerStepSuccess( + workflowId: string, + idempotencyKey: string, + response?: unknown + ): Promise { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + return await workflow.orchestrator.registerStepSuccess( + idempotencyKey, + workflow.handler(this.container, this.context), + undefined, + response + ) + } + + async registerStepFailure( + workflowId: string, + idempotencyKey: string, + error?: Error | any + ): Promise { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + return await workflow.orchestrator.registerStepFailure( + idempotencyKey, + error, + workflow.handler(this.container, this.context) + ) + } +} diff --git a/packages/orchestration/src/workflow/index.ts b/packages/orchestration/src/workflow/index.ts new file mode 100644 index 0000000000..6a8abc0698 --- /dev/null +++ b/packages/orchestration/src/workflow/index.ts @@ -0,0 +1,3 @@ +export * from "./workflow-manager" +export * from "./local-workflow" +export * from "./global-workflow" diff --git a/packages/orchestration/src/workflow/local-workflow.ts b/packages/orchestration/src/workflow/local-workflow.ts new file mode 100644 index 0000000000..fdbef1c72c --- /dev/null +++ b/packages/orchestration/src/workflow/local-workflow.ts @@ -0,0 +1,209 @@ +import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" +import { + DistributedTransaction, + TransactionOrchestrator, + TransactionStepsDefinition, +} from "../transaction" +import { + WorkflowDefinition, + WorkflowManager, + WorkflowStepHandler, +} from "./workflow-manager" + +import { OrchestratorBuilder } from "../transaction/orchestrator-builder" +import { asValue } from "awilix" +import { createMedusaContainer } from "@medusajs/utils" + +type StepHandler = { + invoke: WorkflowStepHandler + compensate?: WorkflowStepHandler +} + +export class LocalWorkflow { + protected container: MedusaContainer + protected workflowId: string + protected flow: OrchestratorBuilder + protected workflow: WorkflowDefinition + protected handlers: Map + + constructor( + workflowId: string, + modulesLoaded?: LoadedModule[] | MedusaContainer + ) { + const globalWorkflow = WorkflowManager.getWorkflow(workflowId) + if (!globalWorkflow) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + this.flow = new OrchestratorBuilder(globalWorkflow.flow_) + this.workflowId = workflowId + this.workflow = globalWorkflow + this.handlers = new Map(globalWorkflow.handlers_) + + const container = createMedusaContainer() + + // Medusa container + if (!Array.isArray(modulesLoaded) && modulesLoaded) { + const cradle = modulesLoaded.cradle + for (const key in cradle) { + container.register(key, asValue(cradle[key])) + } + } + // Array of modules + else if (modulesLoaded?.length) { + for (const mod of modulesLoaded) { + const registrationName = mod.__definition.registrationName + container.register(registrationName, asValue(mod)) + } + } + + this.container = container + } + + protected commit() { + const finalFlow = this.flow.build() + + this.workflow = { + id: this.workflowId, + flow_: finalFlow, + orchestrator: new TransactionOrchestrator(this.workflowId, finalFlow), + handler: WorkflowManager.buildHandlers(this.handlers), + handlers_: this.handlers, + } + } + + async run(uniqueTransactionId: string, input?: unknown, context?: Context) { + if (this.flow.hasChanges) { + this.commit() + } + + const { handler, orchestrator } = this.workflow + + const transaction = await orchestrator.beginTransaction( + uniqueTransactionId, + handler(this.container, context), + input + ) + + await orchestrator.resume(transaction) + + return transaction + } + + async registerStepSuccess( + idempotencyKey: string, + response?: unknown, + context?: Context + ): Promise { + const { handler, orchestrator } = this.workflow + return await orchestrator.registerStepSuccess( + idempotencyKey, + handler(this.container, context), + undefined, + response + ) + } + + async registerStepFailure( + idempotencyKey: string, + error?: Error | any, + context?: Context + ): Promise { + const { handler, orchestrator } = this.workflow + return await orchestrator.registerStepFailure( + idempotencyKey, + error, + handler(this.container, context) + ) + } + + addAction( + action: string, + handler: StepHandler, + options: Partial = {} + ) { + this.assertHandler(handler, action) + this.handlers.set(action, handler) + + return this.flow.addAction(action, options) + } + + replaceAction( + existingAction: string, + action: string, + handler: StepHandler, + options: Partial = {} + ) { + this.assertHandler(handler, action) + this.handlers.set(action, handler) + + return this.flow.replaceAction(existingAction, action, options) + } + + insertActionBefore( + existingAction: string, + action: string, + handler: StepHandler, + options: Partial = {} + ) { + this.assertHandler(handler, action) + this.handlers.set(action, handler) + + return this.flow.insertActionBefore(existingAction, action, options) + } + + insertActionAfter( + existingAction: string, + action: string, + handler: StepHandler, + options: Partial = {} + ) { + this.assertHandler(handler, action) + this.handlers.set(action, handler) + + return this.flow.insertActionAfter(existingAction, action, options) + } + + appendAction( + action: string, + to: string, + handler: StepHandler, + options: Partial = {} + ) { + this.assertHandler(handler, action) + this.handlers.set(action, handler) + + return this.flow.appendAction(action, to, options) + } + + moveAction(actionToMove: string, targetAction: string): OrchestratorBuilder { + return this.flow.moveAction(actionToMove, targetAction) + } + + moveAndMergeNextAction( + actionToMove: string, + targetAction: string + ): OrchestratorBuilder { + return this.flow.moveAndMergeNextAction(actionToMove, targetAction) + } + + mergeActions(where: string, ...actions: string[]) { + return this.flow.mergeActions(where, ...actions) + } + + deleteAction(action: string, parentSteps?) { + return this.flow.deleteAction(action, parentSteps) + } + + pruneAction(action: string) { + return this.flow.pruneAction(action) + } + + protected assertHandler(handler: StepHandler, action: string): void | never { + if (!handler?.invoke) { + throw new Error( + `Handler for action "${action}" is missing invoke function.` + ) + } + } +} diff --git a/packages/orchestration/src/workflow/workflow-manager.ts b/packages/orchestration/src/workflow/workflow-manager.ts new file mode 100644 index 0000000000..502b15c672 --- /dev/null +++ b/packages/orchestration/src/workflow/workflow-manager.ts @@ -0,0 +1,168 @@ +import { Context, MedusaContainer } from "@medusajs/types" +import { + OrchestratorBuilder, + TransactionHandlerType, + TransactionMetadata, + TransactionOrchestrator, + TransactionStepHandler, + TransactionStepsDefinition, +} from "../transaction" + +export interface WorkflowDefinition { + id: string + handler: ( + container: MedusaContainer, + context?: Context + ) => TransactionStepHandler + orchestrator: TransactionOrchestrator + flow_: TransactionStepsDefinition + handlers_: Map< + string, + { invoke: WorkflowStepHandler; compensate?: WorkflowStepHandler } + > + requiredModules?: Set + optionalModules?: Set +} + +export type WorkflowHandler = Map< + string, + { invoke: WorkflowStepHandler; compensate?: WorkflowStepHandler } +> + +export type WorkflowStepHandler = (args: { + container: MedusaContainer + payload: unknown + invoke: { [actions: string]: unknown } + compensate: { [actions: string]: unknown } + metadata: TransactionMetadata + context?: Context +}) => unknown + +export class WorkflowManager { + protected static workflows: Map = new Map() + + static unregister(workflowId: string) { + WorkflowManager.workflows.delete(workflowId) + } + + static unregisterAll() { + WorkflowManager.workflows.clear() + } + + static getWorkflows() { + return WorkflowManager.workflows + } + + static getWorkflow(workflowId: string) { + return WorkflowManager.workflows.get(workflowId) + } + + static getTransactionDefinition(workflowId): OrchestratorBuilder { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + return new OrchestratorBuilder(workflow.flow_) + } + + static register( + workflowId: string, + flow: TransactionStepsDefinition | OrchestratorBuilder, + handlers: WorkflowHandler, + requiredModules?: Set, + optionalModules?: Set + ) { + if (WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" is already defined.`) + } + + const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow + + WorkflowManager.workflows.set(workflowId, { + id: workflowId, + flow_: finalFlow, + orchestrator: new TransactionOrchestrator(workflowId, finalFlow), + handler: WorkflowManager.buildHandlers(handlers), + handlers_: handlers, + requiredModules, + optionalModules, + }) + } + + static update( + workflowId: string, + flow: TransactionStepsDefinition | OrchestratorBuilder, + handlers: Map< + string, + { invoke: WorkflowStepHandler; compensate?: WorkflowStepHandler } + >, + requiredModules?: Set, + optionalModules?: Set + ) { + if (!WorkflowManager.workflows.has(workflowId)) { + throw new Error(`Workflow with id "${workflowId}" not found.`) + } + + const workflow = WorkflowManager.workflows.get(workflowId)! + + for (const [key, value] of handlers.entries()) { + workflow.handlers_.set(key, value) + } + + const finalFlow = flow instanceof OrchestratorBuilder ? flow.build() : flow + + WorkflowManager.workflows.set(workflowId, { + id: workflowId, + flow_: finalFlow, + orchestrator: new TransactionOrchestrator(workflowId, finalFlow), + handler: WorkflowManager.buildHandlers(workflow.handlers_), + handlers_: workflow.handlers_, + requiredModules, + optionalModules, + }) + } + + public static buildHandlers( + handlers: Map< + string, + { invoke: WorkflowStepHandler; compensate?: WorkflowStepHandler } + > + ): (container: MedusaContainer, context?: Context) => TransactionStepHandler { + return ( + container: MedusaContainer, + context?: Context + ): TransactionStepHandler => { + return async ( + actionId: string, + handlerType: TransactionHandlerType, + payload?: any + ) => { + const command = handlers.get(actionId) + + if (!command) { + throw new Error(`Handler for action "${actionId}" not found.`) + } else if (!command[handlerType]) { + throw new Error( + `"${handlerType}" handler for action "${actionId}" not found.` + ) + } + + const { invoke, compensate, payload: input } = payload.context + const { metadata } = payload + + return await command[handlerType]!({ + container, + payload: input, + invoke, + compensate, + metadata, + context, + }) + } + } + } +} + +global.WorkflowManager ??= WorkflowManager +exports.WorkflowManager = global.WorkflowManager diff --git a/packages/orchestration/tsconfig.json b/packages/orchestration/tsconfig.json index 9fa65c92eb..d27e01678b 100644 --- a/packages/orchestration/tsconfig.json +++ b/packages/orchestration/tsconfig.json @@ -1,7 +1,7 @@ { "compilerOptions": { - "lib": ["es5", "es6", "es2019"], - "target": "es5", + "lib": ["es2020"], + "target": "es2020", "outDir": "./dist", "esModuleInterop": true, "declaration": true, diff --git a/packages/types/src/modules-sdk/index.ts b/packages/types/src/modules-sdk/index.ts index d49ad27cca..ad331e2dbf 100644 --- a/packages/types/src/modules-sdk/index.ts +++ b/packages/types/src/modules-sdk/index.ts @@ -1,5 +1,6 @@ -import { MedusaContainer } from "../common" +import { JoinerServiceConfig } from "../joiner" import { Logger } from "../logger" +import { MedusaContainer } from "../common" import { RepositoryService } from "../dal" export type Constructor = new (...args: any[]) => T @@ -69,6 +70,11 @@ export type ModuleDefinition = { | ExternalModuleDeclaration } +export type LoadedModule = unknown & { + __joinerConfig: JoinerServiceConfig + __definition: ModuleDefinition +} + export type LoaderOptions> = { container: MedusaContainer options?: TOptions diff --git a/packages/types/src/shared-context.ts b/packages/types/src/shared-context.ts index 81c925ee0d..cfc127a374 100644 --- a/packages/types/src/shared-context.ts +++ b/packages/types/src/shared-context.ts @@ -8,4 +8,5 @@ export type Context = { transactionManager?: TManager isolationLevel?: string enableNestedTransactions?: boolean + transactionId?: string } diff --git a/packages/workflows/jest.config.js b/packages/workflows/jest.config.js new file mode 100644 index 0000000000..7de5bf104a --- /dev/null +++ b/packages/workflows/jest.config.js @@ -0,0 +1,13 @@ +module.exports = { + globals: { + "ts-jest": { + tsConfig: "tsconfig.json", + isolatedModules: false, + }, + }, + transform: { + "^.+\\.[jt]s?$": "ts-jest", + }, + testEnvironment: `node`, + moduleFileExtensions: [`js`, `ts`], +} diff --git a/packages/workflows/package.json b/packages/workflows/package.json new file mode 100644 index 0000000000..e60bc3af82 --- /dev/null +++ b/packages/workflows/package.json @@ -0,0 +1,41 @@ +{ + "name": "@medusajs/workflows", + "version": "0.0.1", + "description": "Set of workflows for Medusa", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "repository": { + "type": "git", + "url": "https://github.com/medusajs/medusa", + "directory": "packages/workflows" + }, + "publishConfig": { + "access": "public" + }, + "files": [ + "dist" + ], + "author": "Medusa", + "license": "MIT", + "devDependencies": { + "@medusajs/types": "^1.8.10", + "cross-env": "^5.2.1", + "jest": "^25.5.4", + "rimraf": "^5.0.1", + "ts-jest": "^25.5.1", + "typescript": "^4.4.4" + }, + "dependencies": { + "@medusajs/modules-sdk": "^1.8.8", + "@medusajs/orchestration": "^0.0.2", + "@medusajs/utils": "^1.9.2", + "awilix": "^8.0.1", + "ulid": "^2.3.0" + }, + "scripts": { + "prepare": "cross-env NODE_ENV=production yarn run build", + "build": "rimraf dist && tsc --build", + "watch": "tsc --build --watch", + "test": "jest --passWithNoTests" + } +} diff --git a/packages/workflows/src/definition/create-products.ts b/packages/workflows/src/definition/create-products.ts new file mode 100644 index 0000000000..b8a9b7663e --- /dev/null +++ b/packages/workflows/src/definition/create-products.ts @@ -0,0 +1,56 @@ +import { InputAlias, Workflows } from "../definitions" +import { + TransactionStepsDefinition, + WorkflowManager, +} from "@medusajs/orchestration" +import { + createProducts as createProductsHandler, + removeProducts, +} from "../handlers" +import { exportWorkflow, pipe } from "../helper" + +import { ProductTypes } from "@medusajs/types" + +enum Actions { + createProducts = "createProducts", +} + +const workflowSteps: TransactionStepsDefinition = { + next: { + action: Actions.createProducts, + }, +} + +const handlers = new Map([ + [ + Actions.createProducts, + { + invoke: pipe( + { + inputAlias: InputAlias.Products, + invoke: { + from: InputAlias.Products, + alias: InputAlias.Products, + }, + }, + createProductsHandler + ), + compensate: pipe( + { + invoke: { + from: Actions.createProducts, + alias: InputAlias.Products, + }, + }, + removeProducts + ), + }, + ], +]) + +WorkflowManager.register(Workflows.CreateProducts, workflowSteps, handlers) + +export const createProducts = exportWorkflow< + ProductTypes.CreateProductDTO[], + ProductTypes.ProductDTO[] +>(Workflows.CreateProducts, Actions.createProducts) diff --git a/packages/workflows/src/definition/index.ts b/packages/workflows/src/definition/index.ts new file mode 100644 index 0000000000..fa30720e57 --- /dev/null +++ b/packages/workflows/src/definition/index.ts @@ -0,0 +1 @@ +export * from "./create-products" diff --git a/packages/workflows/src/definitions.ts b/packages/workflows/src/definitions.ts new file mode 100644 index 0000000000..fb53ef7afe --- /dev/null +++ b/packages/workflows/src/definitions.ts @@ -0,0 +1,14 @@ +export enum Workflows { + CreateProducts = "create-products", +} + +export enum InputAlias { + Products = "products", + RemovedProducts = "removedProducts", + + InventoryItems = "inventoryItems", + RemovedInventoryItems = "removedInventoryItems", + + AttachedInventoryItems = "attachedInventoryItems", + DetachedInventoryItems = "detachedInventoryItems", +} diff --git a/packages/workflows/src/handlers/attach-inventory-items.ts b/packages/workflows/src/handlers/attach-inventory-items.ts new file mode 100644 index 0000000000..19806b82d0 --- /dev/null +++ b/packages/workflows/src/handlers/attach-inventory-items.ts @@ -0,0 +1,35 @@ +import { InventoryItemDTO, ProductTypes } from "@medusajs/types" + +import { InputAlias } from "../definitions" +import { WorkflowArguments } from "../helper" + +export async function attachInventoryItems({ + container, + data, +}: WorkflowArguments & { + data: { + variant: ProductTypes.ProductVariantDTO + [InputAlias.InventoryItems]: InventoryItemDTO + }[] +}) { + const manager = container.resolve("manager") + const productVariantInventoryService = container + .resolve("productVariantInventoryService") + .withTransaction(manager) + + const value = await Promise.all( + data + .filter((d) => d) + .map(async ({ variant, [InputAlias.InventoryItems]: inventoryItem }) => { + return await productVariantInventoryService.attachInventoryItem( + variant.id, + inventoryItem.id + ) + }) + ) + + return { + alias: InputAlias.AttachedInventoryItems, + value, + } +} diff --git a/packages/workflows/src/handlers/create-inventory-items.ts b/packages/workflows/src/handlers/create-inventory-items.ts new file mode 100644 index 0000000000..937dc89c82 --- /dev/null +++ b/packages/workflows/src/handlers/create-inventory-items.ts @@ -0,0 +1,59 @@ +import { IInventoryService, ProductTypes } from "@medusajs/types" + +import { InputAlias } from "../definitions" +import { WorkflowArguments } from "../helper" + +export async function createInventoryItems({ + container, + data, +}: WorkflowArguments & { + data: { + [InputAlias.Products]: ProductTypes.ProductDTO[] + } +}) { + const manager = container.resolve("manager") + const inventoryService: IInventoryService = + container.resolve("inventoryService") + const context = { transactionManager: manager } + + const products = data[InputAlias.Products] + const variants = products.reduce( + ( + acc: ProductTypes.ProductVariantDTO[], + product: ProductTypes.ProductDTO + ) => { + return acc.concat(product.variants) + }, + [] + ) + + const value = await Promise.all( + variants.map(async (variant) => { + if (!variant.manage_inventory) { + return + } + + const inventoryItem = await inventoryService!.createInventoryItem( + { + sku: variant.sku!, + origin_country: variant.origin_country!, + hs_code: variant.hs_code!, + mid_code: variant.mid_code!, + material: variant.material!, + weight: variant.weight!, + length: variant.length!, + height: variant.height!, + width: variant.width!, + }, + context + ) + + return { variant, inventoryItem } + }) + ) + + return { + alias: InputAlias.InventoryItems, + value, + } +} diff --git a/packages/workflows/src/handlers/create-products.ts b/packages/workflows/src/handlers/create-products.ts new file mode 100644 index 0000000000..539f167475 --- /dev/null +++ b/packages/workflows/src/handlers/create-products.ts @@ -0,0 +1,23 @@ +import { InputAlias } from "../definitions" +import { ProductTypes } from "@medusajs/types" +import { WorkflowArguments } from "../helper" + +export async function createProducts({ + container, + context, + data, +}: WorkflowArguments & { + data: { [InputAlias.Products]: ProductTypes.CreateProductDTO[] } +}) { + const productModuleService = container.resolve("productModuleService") + + const value = await productModuleService.create( + data[InputAlias.Products], + context + ) + + return { + alias: InputAlias.Products, + value, + } +} diff --git a/packages/workflows/src/handlers/index.ts b/packages/workflows/src/handlers/index.ts new file mode 100644 index 0000000000..0c5bc125f6 --- /dev/null +++ b/packages/workflows/src/handlers/index.ts @@ -0,0 +1,5 @@ +export * from "./remove-products" +export * from "./create-products" +export * from "./create-inventory-items" +export * from "./remove-inventory-items" +export * from "./attach-inventory-items" diff --git a/packages/workflows/src/handlers/remove-inventory-items.ts b/packages/workflows/src/handlers/remove-inventory-items.ts new file mode 100644 index 0000000000..301245f91d --- /dev/null +++ b/packages/workflows/src/handlers/remove-inventory-items.ts @@ -0,0 +1,31 @@ +import { InventoryItemDTO, MedusaContainer } from "@medusajs/types" + +import { InputAlias } from "../definitions" +import { WorkflowArguments } from "../helper" + +export async function removeInventoryItems({ + container, + data, +}: WorkflowArguments & { + data: { + [InputAlias.InventoryItems]: InventoryItemDTO + }[] +}) { + const manager = container.resolve("manager") + const inventoryService = container.resolve("inventoryService") + const context = { transactionManager: manager } + + const value = await Promise.all( + data.map(async ({ [InputAlias.InventoryItems]: inventoryItem }) => { + return await inventoryService!.deleteInventoryItem( + inventoryItem.id, + context + ) + }) + ) + + return { + alias: InputAlias.RemovedInventoryItems, + value, + } +} diff --git a/packages/workflows/src/handlers/remove-products.ts b/packages/workflows/src/handlers/remove-products.ts new file mode 100644 index 0000000000..b9a0dbe3a2 --- /dev/null +++ b/packages/workflows/src/handlers/remove-products.ts @@ -0,0 +1,22 @@ +import { InputAlias } from "../definitions" +import { ProductTypes } from "@medusajs/types" +import { WorkflowArguments } from "../helper" + +export async function removeProducts({ + container, + data, +}: WorkflowArguments & { + data: { + [InputAlias.Products]: ProductTypes.ProductDTO[] + } +}) { + const productModuleService = container.resolve("productModuleService") + const value = await productModuleService.softDelete( + data[InputAlias.Products].map((p) => p.id) + ) + + return { + alias: InputAlias.RemovedProducts, + value, + } +} diff --git a/packages/workflows/src/helper/empty-handler.ts b/packages/workflows/src/helper/empty-handler.ts new file mode 100644 index 0000000000..34233673d1 --- /dev/null +++ b/packages/workflows/src/helper/empty-handler.ts @@ -0,0 +1 @@ +export const emptyHandler: any = () => {} diff --git a/packages/workflows/src/helper/index.ts b/packages/workflows/src/helper/index.ts new file mode 100644 index 0000000000..4442dddba1 --- /dev/null +++ b/packages/workflows/src/helper/index.ts @@ -0,0 +1,3 @@ +export * from "./empty-handler" +export * from "./pipe" +export * from "./workflow-export" diff --git a/packages/workflows/src/helper/pipe.ts b/packages/workflows/src/helper/pipe.ts new file mode 100644 index 0000000000..6f150d2093 --- /dev/null +++ b/packages/workflows/src/helper/pipe.ts @@ -0,0 +1,98 @@ +import { Context, MedusaContainer, SharedContext } from "@medusajs/types" +import { + TransactionMetadata, + WorkflowStepHandler, +} from "@medusajs/orchestration" + +import { InputAlias } from "../definitions" + +type WorkflowStepReturn = { + alias: string + value: any +} + +type WorkflowStepInput = { + from: string + alias: string +} + +interface PipelineInput { + inputAlias?: InputAlias | string + invoke?: WorkflowStepInput | WorkflowStepInput[] + compensate?: WorkflowStepInput | WorkflowStepInput[] +} + +export type WorkflowArguments = { + container: MedusaContainer + payload: unknown + data: any + metadata: TransactionMetadata + context: Context | SharedContext +} + +export type PipelineHandler = ( + args: WorkflowArguments +) => Promise + +export function pipe( + input: PipelineInput, + ...functions: PipelineHandler[] +): WorkflowStepHandler { + return async ({ + container, + payload, + invoke, + compensate, + metadata, + context, + }) => { + const data = {} + + const original = { + invoke: invoke ?? {}, + compensate: compensate ?? {}, + } + + if (input.inputAlias) { + Object.assign(original.invoke, { [input.inputAlias]: payload }) + } + + for (const key in input) { + if (!input[key]) { + continue + } + + if (!Array.isArray(input[key])) { + input[key] = [input[key]] + } + + for (const action of input[key]) { + if (action?.alias) { + data[action.alias] = original[key][action.from] + } + } + } + + return functions.reduce(async (_, fn) => { + let result = await fn({ + container, + payload, + data, + metadata, + context: context as Context, + }) + + if (Array.isArray(result)) { + for (const action of result) { + if (action?.alias) { + data[action.alias] = action.value + } + } + } else if (result?.alias) { + data[result.alias] = result.value + } + + return result + }, {}) + } +} diff --git a/packages/workflows/src/helper/workflow-export.ts b/packages/workflows/src/helper/workflow-export.ts new file mode 100644 index 0000000000..b04a4a3887 --- /dev/null +++ b/packages/workflows/src/helper/workflow-export.ts @@ -0,0 +1,107 @@ +import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" +import { + DistributedTransaction, + LocalWorkflow, + TransactionState, + TransactionStepError, +} from "@medusajs/orchestration" + +import { EOL } from "os" +import { MedusaModule } from "@medusajs/modules-sdk" +import { Workflows } from "../definitions" +import { ulid } from "ulid" + +export type FlowRunOptions = { + input?: TData + context?: Context + resultFrom?: string | string[] + throwOnError?: boolean +} + +export type WorkflowResult = { + errors: TransactionStepError[] + transaction: DistributedTransaction + result: TResult +} + +export const exportWorkflow = ( + workflowId: Workflows, + defaultResult?: string +) => { + return function ( + container?: LoadedModule[] | MedusaContainer + ): Omit & { + run: ( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + } { + if (!container) { + container = MedusaModule.getLoadedModules().map( + (mod) => Object.values(mod)[0] + ) + } + + const flow = new LocalWorkflow(workflowId, container) + + const originalRun = flow.run.bind(flow) + const newRun = async ( + { input, context, throwOnError, resultFrom }: FlowRunOptions = { + throwOnError: true, + resultFrom: defaultResult, + } + ) => { + const transaction = await originalRun( + context?.transactionId ?? ulid(), + input, + context + ) + + const errors = transaction.getErrors() + + const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED] + if (failedStatus.includes(transaction.getState()) && throwOnError) { + const errorMessage = errors + ?.map((err) => `${err.error?.message}${EOL}${err.error?.stack}`) + ?.join(`${EOL}`) + throw new Error(errorMessage) + } + + let result: any = undefined + + if (resultFrom) { + if (Array.isArray(resultFrom)) { + result = resultFrom.map( + (from) => transaction.getContext().invoke?.[from] + ) + } else { + result = transaction.getContext().invoke?.[resultFrom] + } + } + + return { + errors, + transaction, + result, + } + } + flow.run = newRun as any + + return flow as unknown as LocalWorkflow & { + run: ( + args?: FlowRunOptions< + TDataOverride extends undefined ? TData : TDataOverride + > + ) => Promise< + WorkflowResult< + TResultOverride extends undefined ? TResult : TResultOverride + > + > + } + } +} diff --git a/packages/workflows/src/index.ts b/packages/workflows/src/index.ts new file mode 100644 index 0000000000..2816d42f49 --- /dev/null +++ b/packages/workflows/src/index.ts @@ -0,0 +1,4 @@ +export * from "./definition" +export * from "./definitions" +export * as Handlers from "./handlers" +export * from "./helper" diff --git a/packages/workflows/tsconfig.json b/packages/workflows/tsconfig.json new file mode 100644 index 0000000000..d27e01678b --- /dev/null +++ b/packages/workflows/tsconfig.json @@ -0,0 +1,29 @@ +{ + "compilerOptions": { + "lib": ["es2020"], + "target": "es2020", + "outDir": "./dist", + "esModuleInterop": true, + "declaration": true, + "module": "commonjs", + "moduleResolution": "node", + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "sourceMap": true, + "noImplicitReturns": true, + "strictNullChecks": true, + "strictFunctionTypes": true, + "noImplicitThis": true, + "allowJs": true, + "skipLibCheck": true, + "downlevelIteration": true + }, + "include": ["src"], + "exclude": [ + "dist", + "./src/**/__tests__", + "./src/**/__mocks__", + "./src/**/__fixtures__", + "node_modules" + ] +} diff --git a/packages/workflows/tsconfig.spec.json b/packages/workflows/tsconfig.spec.json new file mode 100644 index 0000000000..b800dda7ee --- /dev/null +++ b/packages/workflows/tsconfig.spec.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "include": ["src"], + "exclude": ["node_modules", "dist"] +} diff --git a/yarn.lock b/yarn.lock index 8f14a9ab93..d0a2773402 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6343,6 +6343,7 @@ __metadata: dependencies: "@medusajs/medusa-cli": ^1.3.17 "@medusajs/modules-sdk": ^1.8.8 + "@medusajs/orchestration": ^0.0.2 "@medusajs/types": ^1.8.11 "@medusajs/utils": ^1.9.2 "@types/express": ^4.17.17 @@ -6564,6 +6565,24 @@ __metadata: languageName: unknown linkType: soft +"@medusajs/workflows@workspace:packages/workflows": + version: 0.0.0-use.local + resolution: "@medusajs/workflows@workspace:packages/workflows" + dependencies: + "@medusajs/modules-sdk": ^1.8.8 + "@medusajs/orchestration": ^0.0.2 + "@medusajs/types": ^1.8.10 + "@medusajs/utils": ^1.9.2 + awilix: ^8.0.1 + cross-env: ^5.2.1 + jest: ^25.5.4 + rimraf: ^5.0.1 + ts-jest: ^25.5.1 + typescript: ^4.4.4 + ulid: ^2.3.0 + languageName: unknown + linkType: soft + "@microsoft/fetch-event-source@npm:2.0.1": version: 2.0.1 resolution: "@microsoft/fetch-event-source@npm:2.0.1"