diff --git a/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts b/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts index 36768de3a6..0aad3b7212 100644 --- a/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts +++ b/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts @@ -1,3 +1,7 @@ +import { + createShippingOptionsWorkflow, + updateShippingOptionsWorkflow, +} from "@medusajs/core-flows" import { ModuleRegistrationName } from "@medusajs/modules-sdk" import { FulfillmentSetDTO, @@ -8,16 +12,12 @@ import { ShippingProfileDTO, UpdateShippingOptionsWorkflowInput, } from "@medusajs/types" -import { medusaIntegrationTestRunner } from "medusa-test-utils/dist" -import { - createShippingOptionsWorkflow, - updateShippingOptionsWorkflow, -} from "@medusajs/core-flows" import { ContainerRegistrationKeys, - remoteQueryObjectFromString, RuleOperator, + remoteQueryObjectFromString, } from "@medusajs/utils" +import { medusaIntegrationTestRunner } from "medusa-test-utils/dist" jest.setTimeout(100000) diff --git a/packages/event-bus-redis/src/services/event-bus-redis.ts b/packages/event-bus-redis/src/services/event-bus-redis.ts index 628ada4933..a7b92093ba 100644 --- a/packages/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/event-bus-redis/src/services/event-bus-redis.ts @@ -62,10 +62,13 @@ export default class RedisEventBusService extends AbstractEventBusModuleService __hooks = { onApplicationShutdown: async () => { - await this.bullWorker_?.close(true) await this.queue_.close() + // eslint-disable-next-line max-len this.eventBusRedisConnection_.disconnect() }, + onApplicationPrepareShutdown: async () => { + await this.bullWorker_?.close() + }, } /** diff --git a/packages/medusa/src/commands/start-cluster.js b/packages/medusa/src/commands/start-cluster.js index efd9026257..9a2253b34e 100644 --- a/packages/medusa/src/commands/start-cluster.js +++ b/packages/medusa/src/commands/start-cluster.js @@ -54,7 +54,7 @@ export default async function ({ port, cpus, directory }) { const app = express() - const { dbConnection, shutdown } = await loaders({ + const { dbConnection, shutdown, prepareShutdown } = await loaders({ directory, expressApp: app, }) @@ -72,10 +72,12 @@ export default async function ({ port, cpus, directory }) { const gracefulShutDown = () => { server .shutdown() - .then(() => { - shutdown().then(() => { - process.exit(0) - }) + .then(async () => { + return await prepareShutdown() + }) + .then(async () => { + await shutdown() + process.exit(0) }) .catch((e) => { process.exit(1) diff --git a/packages/medusa/src/commands/start.js b/packages/medusa/src/commands/start.js index e3bbda43cc..fb735f45a6 100644 --- a/packages/medusa/src/commands/start.js +++ b/packages/medusa/src/commands/start.js @@ -19,7 +19,7 @@ export default async function ({ port, directory }) { const app = express() try { - const { dbConnection, shutdown } = await loaders({ + const { dbConnection, shutdown, prepareShutdown } = await loaders({ directory, expressApp: app, }) @@ -37,13 +37,15 @@ export default async function ({ port, directory }) { // Handle graceful shutdown const gracefulShutDown = () => { + Logger.info("Gracefully shutting down server") server .shutdown() - .then(() => { - Logger.info("Gracefully stopping the server.") - shutdown().then(() => { - process.exit(0) - }) + .then(async () => { + return await prepareShutdown() + }) + .then(async () => { + await shutdown() + process.exit(0) }) .catch((e) => { Logger.error("Error received when shutting down the server.", e) diff --git a/packages/medusa/src/loaders/helpers/register-workflows.ts b/packages/medusa/src/loaders/helpers/register-workflows.ts new file mode 100644 index 0000000000..0ddea527ab --- /dev/null +++ b/packages/medusa/src/loaders/helpers/register-workflows.ts @@ -0,0 +1,21 @@ +import { PluginDetails } from "@medusajs/types" +import { glob } from "glob" +import { getResolvedPlugins } from "./resolve-plugins" + +/** + * import files from the workflows directory to run the registration of the wofklows + * @param pluginDetails + */ +async function registerWorkflows(pluginDetails: PluginDetails): Promise { + const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {}) + await Promise.all(files.map(async (file) => import(file))) +} + +export async function registerProjectWorkflows({ + rootDirectory, + configModule, +}) { + const [resolved] = + getResolvedPlugins(rootDirectory, configModule, "dist", true) || [] + await registerWorkflows(resolved) +} diff --git a/packages/medusa/src/loaders/helpers/resolve-plugins.ts b/packages/medusa/src/loaders/helpers/resolve-plugins.ts new file mode 100644 index 0000000000..2fe17ff9ac --- /dev/null +++ b/packages/medusa/src/loaders/helpers/resolve-plugins.ts @@ -0,0 +1,151 @@ +import { ConfigModule, PluginDetails } from "@medusajs/types" +import { isString } from "@medusajs/utils" +import fs from "fs" +import { sync as existsSync } from "fs-exists-cached" +import { createRequireFromPath } from "medusa-core-utils" +import path from "path" +import { MEDUSA_PROJECT_NAME } from "../plugins" + +function createPluginId(name: string): string { + return name +} + +function createFileContentHash(path, files): string { + return path + files +} + +/** + * Finds the correct path for the plugin. If it is a local plugin it will be + * found in the plugins folder. Otherwise we will look for the plugin in the + * installed npm packages. + * @param {string} pluginName - the name of the plugin to find. Should match + * the name of the folder where the plugin is contained. + * @return {object} the plugin details + */ +function resolvePlugin(pluginName: string): { + resolve: string + id: string + name: string + options: Record + version: string +} { + // Only find plugins when we're not given an absolute path + if (!existsSync(pluginName)) { + // Find the plugin in the local plugins folder + const resolvedPath = path.resolve(`./plugins/${pluginName}`) + + if (existsSync(resolvedPath)) { + if (existsSync(`${resolvedPath}/package.json`)) { + const packageJSON = JSON.parse( + fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) + ) + const name = packageJSON.name || pluginName + // warnOnIncompatiblePeerDependency(name, packageJSON) + + return { + resolve: resolvedPath, + name, + id: createPluginId(name), + options: {}, + version: + packageJSON.version || createFileContentHash(resolvedPath, `**`), + } + } else { + // Make package.json a requirement for local plugins too + throw new Error(`Plugin ${pluginName} requires a package.json file`) + } + } + } + + const rootDir = path.resolve(".") + + /** + * Here we have an absolute path to an internal plugin, or a name of a module + * which should be located in node_modules. + */ + try { + const requireSource = + rootDir !== null + ? createRequireFromPath(`${rootDir}/:internal:`) + : require + + // If the path is absolute, resolve the directory of the internal plugin, + // otherwise resolve the directory containing the package.json + const resolvedPath = path.dirname( + requireSource.resolve(`${pluginName}/package.json`) + ) + + const packageJSON = JSON.parse( + fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) + ) + // warnOnIncompatiblePeerDependency(packageJSON.name, packageJSON) + + const computedResolvedPath = + resolvedPath + (process.env.DEV_MODE ? "/src" : "") + + // Add support for a plugin to output the build into a dist directory + const resolvedPathToDist = resolvedPath + "/dist" + const isDistExist = + resolvedPathToDist && + !process.env.DEV_MODE && + existsSync(resolvedPath + "/dist") + + return { + resolve: isDistExist ? resolvedPathToDist : computedResolvedPath, + id: createPluginId(packageJSON.name), + name: packageJSON.name, + options: {}, + version: packageJSON.version, + } + } catch (err) { + throw new Error( + `Unable to find plugin "${pluginName}". Perhaps you need to install its package?` + ) + } +} + +export function getResolvedPlugins( + rootDirectory: string, + configModule: ConfigModule, + extensionDirectoryPath = "dist", + isMedusaProject = false +): undefined | PluginDetails[] { + const { plugins } = configModule + + if (isMedusaProject) { + const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) + + return [ + { + resolve: extensionDirectory, + name: MEDUSA_PROJECT_NAME, + id: createPluginId(MEDUSA_PROJECT_NAME), + options: configModule, + version: createFileContentHash(process.cwd(), `**`), + }, + ] + } + + const resolved = plugins.map((plugin) => { + if (isString(plugin)) { + return resolvePlugin(plugin) + } + + const details = resolvePlugin(plugin.resolve) + details.options = plugin.options + + return details + }) + + const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) + // Resolve user's project as a plugin for loading purposes + resolved.push({ + resolve: extensionDirectory, + name: MEDUSA_PROJECT_NAME, + id: createPluginId(MEDUSA_PROJECT_NAME), + options: configModule, + version: createFileContentHash(process.cwd(), `**`), + }) + + return resolved +} diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index e7a0f0779b..305e39cb20 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -6,8 +6,9 @@ import { import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types" import { ContainerRegistrationKeys, + MedusaV2Flag, isString, - MedusaV2Flag, promiseAll, + promiseAll, } from "@medusajs/utils" import { asValue } from "awilix" import { Express, NextFunction, Request, Response } from "express" @@ -24,6 +25,7 @@ import databaseLoader, { dataSource } from "./database" import defaultsLoader from "./defaults" import expressLoader from "./express" import featureFlagsLoader from "./feature-flags" +import { registerProjectWorkflows } from "./helpers/register-workflows" import medusaProjectApisLoader from "./load-medusa-project-apis" import Logger from "./logger" import loadMedusaApp, { mergeDefaultModules } from "./medusa-app" @@ -102,7 +104,14 @@ async function loadMedusaV2({ [ContainerRegistrationKeys.REMOTE_QUERY]: asValue(null), }) - const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({ + // Workflows are registered before the app to allow modules to run workflows as part of bootstrapping + // e.g. the workflow engine will resume workflows that were running when the server was shut down + await registerProjectWorkflows({ rootDirectory, configModule }) + + const { + onApplicationShutdown: medusaAppOnApplicationShutdown, + onApplicationPrepareShutdown: medusaAppOnApplicationPrepareShutdown, + } = await loadMedusaApp({ configModule, container, }) @@ -149,11 +158,13 @@ async function loadMedusaV2({ await createDefaultsWorkflow(container).run() const shutdown = async () => { + await medusaAppOnApplicationShutdown() + await promiseAll([ container.dispose(), pgConnection?.context?.destroy(), expressShutdown(), - medusaAppOnApplicationShutdown() + medusaAppOnApplicationShutdown(), ]) } @@ -163,6 +174,7 @@ async function loadMedusaV2({ app: expressApp, pgConnection, shutdown, + prepareShutdown: medusaAppOnApplicationPrepareShutdown, } } @@ -177,6 +189,7 @@ export default async ({ app: Express pgConnection: unknown shutdown: () => Promise + prepareShutdown: () => Promise }> => { const configModule = loadConfig(rootDirectory) const featureFlagRouter = featureFlagsLoader(configModule, Logger) @@ -211,7 +224,11 @@ export default async ({ featureFlagRouter: asValue(featureFlagRouter), }) - const { shutdown: redisShutdown } = await redisLoader({ container, configModule, logger: Logger }) + const { shutdown: redisShutdown } = await redisLoader({ + container, + configModule, + logger: Logger, + }) const modelsActivity = Logger.activity(`Initializing models${EOL}`) track("MODELS_INIT_STARTED") @@ -271,7 +288,10 @@ export default async ({ track("MODULES_INIT_STARTED") // Move before services init once all modules are migrated and do not rely on core resources anymore - const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({ + const { + onApplicationShutdown: medusaAppOnApplicationShutdown, + onApplicationPrepareShutdown: medusaAppOnApplicationPrepareShutdown, + } = await loadMedusaApp({ configModule, container, }) @@ -281,7 +301,10 @@ export default async ({ const expActivity = Logger.activity(`Initializing express${EOL}`) track("EXPRESS_INIT_STARTED") - const { shutdown: expressShutdown } = await expressLoader({ app: expressApp, configModule }) + const { shutdown: expressShutdown } = await expressLoader({ + app: expressApp, + configModule, + }) await passportLoader({ app: expressApp, configModule }) const exAct = Logger.success(expActivity, "Express intialized") || {} track("EXPRESS_INIT_COMPLETED", { duration: exAct.duration }) @@ -339,13 +362,14 @@ export default async ({ track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration }) async function shutdown() { + await medusaAppOnApplicationShutdown() + await promiseAll([ container.dispose(), dbConnection?.destroy(), pgConnection?.context?.destroy(), redisShutdown(), expressShutdown(), - medusaAppOnApplicationShutdown(), ]) } @@ -356,5 +380,6 @@ export default async ({ app: expressApp, pgConnection, shutdown, + prepareShutdown: async () => medusaAppOnApplicationPrepareShutdown(), } } diff --git a/packages/medusa/src/loaders/load-medusa-project-apis.ts b/packages/medusa/src/loaders/load-medusa-project-apis.ts index 8dbe27acef..d71327b4d4 100644 --- a/packages/medusa/src/loaders/load-medusa-project-apis.ts +++ b/packages/medusa/src/loaders/load-medusa-project-apis.ts @@ -1,12 +1,12 @@ import { promiseAll } from "@medusajs/utils" import { Express } from "express" import glob from "glob" -import _ from "lodash" import { trackInstallation } from "medusa-telemetry" import { EOL } from "os" import path from "path" import { ConfigModule, Logger, MedusaContainer } from "../types/global" import ScheduledJobsLoader from "./helpers/jobs" +import { getResolvedPlugins } from "./helpers/resolve-plugins" import { RoutesLoader } from "./helpers/routing" import { SubscriberLoader } from "./helpers/subscribers" import logger from "./logger" @@ -55,7 +55,6 @@ export default async ({ ) } await registerSubscribers(pluginDetails, container, activityId) - await registerWorkflows(pluginDetails) }) ) @@ -78,23 +77,6 @@ export default async ({ resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin")) } -function getResolvedPlugins( - rootDirectory: string, - configModule: ConfigModule, - extensionDirectoryPath = "dist" -): undefined | PluginDetails[] { - const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) - return [ - { - resolve: extensionDirectory, - name: MEDUSA_PROJECT_NAME, - id: createPluginId(MEDUSA_PROJECT_NAME), - options: configModule, - version: createFileContentHash(process.cwd(), `**`), - }, - ] -} - async function runLoaders( pluginDetails: PluginDetails, container: MedusaContainer @@ -191,21 +173,3 @@ async function registerSubscribers( true ).load() } - -/** - * import files from the workflows directory to run the registration of the wofklows - * @param pluginDetails - */ -async function registerWorkflows(pluginDetails: PluginDetails): Promise { - const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {}) - await Promise.all(files.map(async (file) => import(file))) -} - -// TODO: Create unique id for each plugin -function createPluginId(name: string): string { - return name -} - -function createFileContentHash(path, files): string { - return path + files -} diff --git a/packages/medusa/src/loaders/plugins.ts b/packages/medusa/src/loaders/plugins.ts index 97e2399837..f9afbaa893 100644 --- a/packages/medusa/src/loaders/plugins.ts +++ b/packages/medusa/src/loaders/plugins.ts @@ -3,13 +3,9 @@ import { promiseAll, upperCaseFirst, } from "@medusajs/utils" -import { aliasTo, asFunction, asValue, Lifetime } from "awilix" +import { Lifetime, aliasTo, asFunction, asValue } from "awilix" import { Express } from "express" -import fs from "fs" -import { sync as existsSync } from "fs-exists-cached" import glob from "glob" -import _ from "lodash" -import { createRequireFromPath } from "medusa-core-utils" import { OauthService } from "medusa-interfaces" import { trackInstallation } from "medusa-telemetry" import { EOL } from "os" @@ -41,6 +37,7 @@ import { registerAbstractFulfillmentServiceFromClass, registerPaymentProcessorFromClass, } from "./helpers/plugins" +import { getResolvedPlugins } from "./helpers/resolve-plugins" import { RoutesLoader } from "./helpers/routing" import { SubscriberLoader } from "./helpers/subscribers" import logger from "./logger" @@ -121,37 +118,6 @@ export default async ({ resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin")) } -function getResolvedPlugins( - rootDirectory: string, - configModule: ConfigModule, - extensionDirectoryPath = "dist" -): undefined | PluginDetails[] { - const { plugins } = configModule - - const resolved = plugins.map((plugin) => { - if (_.isString(plugin)) { - return resolvePlugin(plugin) - } - - const details = resolvePlugin(plugin.resolve) - details.options = plugin.options - - return details - }) - - const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) - // Resolve user's project as a plugin for loading purposes - resolved.push({ - resolve: extensionDirectory, - name: MEDUSA_PROJECT_NAME, - id: createPluginId(MEDUSA_PROJECT_NAME), - options: configModule, - version: createFileContentHash(process.cwd(), `**`), - }) - - return resolved -} - export async function registerPluginModels({ rootDirectory, container, @@ -753,102 +719,3 @@ async function runSetupFunctions(pluginDetails: PluginDetails): Promise { }) ) } - -// TODO: Create unique id for each plugin -function createPluginId(name: string): string { - return name -} - -/** - * Finds the correct path for the plugin. If it is a local plugin it will be - * found in the plugins folder. Otherwise we will look for the plugin in the - * installed npm packages. - * @param {string} pluginName - the name of the plugin to find. Should match - * the name of the folder where the plugin is contained. - * @return {object} the plugin details - */ -function resolvePlugin(pluginName: string): { - resolve: string - id: string - name: string - options: Record - version: string -} { - // Only find plugins when we're not given an absolute path - if (!existsSync(pluginName)) { - // Find the plugin in the local plugins folder - const resolvedPath = path.resolve(`./plugins/${pluginName}`) - - if (existsSync(resolvedPath)) { - if (existsSync(`${resolvedPath}/package.json`)) { - const packageJSON = JSON.parse( - fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) - ) - const name = packageJSON.name || pluginName - // warnOnIncompatiblePeerDependency(name, packageJSON) - - return { - resolve: resolvedPath, - name, - id: createPluginId(name), - options: {}, - version: - packageJSON.version || createFileContentHash(resolvedPath, `**`), - } - } else { - // Make package.json a requirement for local plugins too - throw new Error(`Plugin ${pluginName} requires a package.json file`) - } - } - } - - const rootDir = path.resolve(".") - - /** - * Here we have an absolute path to an internal plugin, or a name of a module - * which should be located in node_modules. - */ - try { - const requireSource = - rootDir !== null - ? createRequireFromPath(`${rootDir}/:internal:`) - : require - - // If the path is absolute, resolve the directory of the internal plugin, - // otherwise resolve the directory containing the package.json - const resolvedPath = path.dirname( - requireSource.resolve(`${pluginName}/package.json`) - ) - - const packageJSON = JSON.parse( - fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) - ) - // warnOnIncompatiblePeerDependency(packageJSON.name, packageJSON) - - const computedResolvedPath = - resolvedPath + (process.env.DEV_MODE ? "/src" : "") - - // Add support for a plugin to output the build into a dist directory - const resolvedPathToDist = resolvedPath + "/dist" - const isDistExist = - resolvedPathToDist && - !process.env.DEV_MODE && - existsSync(resolvedPath + "/dist") - - return { - resolve: isDistExist ? resolvedPathToDist : computedResolvedPath, - id: createPluginId(packageJSON.name), - name: packageJSON.name, - options: {}, - version: packageJSON.version, - } - } catch (err) { - throw new Error( - `Unable to find plugin "${pluginName}". Perhaps you need to install its package?` - ) - } -} - -function createFileContentHash(path, files): string { - return path + files -} diff --git a/packages/modules-sdk/src/loaders/utils/load-internal.ts b/packages/modules-sdk/src/loaders/utils/load-internal.ts index bd6fb6abb2..dfe97b6bf9 100644 --- a/packages/modules-sdk/src/loaders/utils/load-internal.ts +++ b/packages/modules-sdk/src/loaders/utils/load-internal.ts @@ -141,6 +141,7 @@ export async function loadInternalModule( if (loaderOnly) { // The expectation is only to run the loader as standalone, so we do not need to register the service and we need to cleanup all services const service = container.resolve(registrationName) + await service.__hooks?.onApplicationPrepareShutdown() await service.__hooks?.onApplicationShutdown() } } diff --git a/packages/modules-sdk/src/medusa-app.ts b/packages/modules-sdk/src/medusa-app.ts index 80baeb7b32..9972dabcf6 100644 --- a/packages/modules-sdk/src/medusa-app.ts +++ b/packages/modules-sdk/src/medusa-app.ts @@ -204,6 +204,7 @@ export type MedusaAppOutput = { notFound?: Record> runMigrations: RunMigrationFn onApplicationShutdown: () => Promise + onApplicationPrepareShutdown: () => Promise } export type MedusaAppOptions = { @@ -243,6 +244,7 @@ async function MedusaApp_({ migrationOnly?: boolean } = {}): Promise { const sharedContainer_ = createMedusaContainer({}, sharedContainer) + const onApplicationShutdown = async () => { await promiseAll([ MedusaModule.onApplicationShutdown(), @@ -250,6 +252,10 @@ async function MedusaApp_({ ]) } + const onApplicationPrepareShutdown = async () => { + await promiseAll([MedusaModule.onApplicationPrepareShutdown()]) + } + const modules: MedusaModuleConfig = modulesConfig ?? ( @@ -312,6 +318,7 @@ async function MedusaApp_({ if (loaderOnly) { return { onApplicationShutdown, + onApplicationPrepareShutdown, modules: allModules, link: undefined, query: async () => { @@ -388,6 +395,7 @@ async function MedusaApp_({ return { onApplicationShutdown, + onApplicationPrepareShutdown, modules: allModules, link: remoteLink, query, diff --git a/packages/modules-sdk/src/medusa-module.ts b/packages/modules-sdk/src/medusa-module.ts index 2c03d1b43d..f3b4275449 100644 --- a/packages/modules-sdk/src/medusa-module.ts +++ b/packages/modules-sdk/src/medusa-module.ts @@ -136,6 +136,23 @@ export class MedusaModule { ) } + public static async onApplicationPrepareShutdown(): Promise { + await promiseAll( + [...MedusaModule.instances_.values()] + .map((instances) => { + return Object.values(instances).map((instance: IModuleService) => { + return instance.__hooks?.onApplicationPrepareShutdown + ?.bind(instance)() + .catch(() => { + // The module should handle this and log it + return void 0 + }) + }) + }) + .flat() + ) + } + public static clearInstances(): void { MedusaModule.instances_.clear() MedusaModule.modules_.clear() diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 0c31b17c37..a327734aeb 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -752,6 +752,9 @@ export class TransactionOrchestrator extends EventEmitter { await setStepFailure(error) }) + + const eventName = DistributedTransactionEvent.STEP_AWAITING + transaction.emit(eventName, { step, transaction }) }) ) } diff --git a/packages/orchestration/src/transaction/types.ts b/packages/orchestration/src/transaction/types.ts index 1ab854c72d..f113e447df 100644 --- a/packages/orchestration/src/transaction/types.ts +++ b/packages/orchestration/src/transaction/types.ts @@ -137,6 +137,7 @@ export enum DistributedTransactionEvent { STEP_BEGIN = "stepBegin", STEP_SUCCESS = "stepSuccess", STEP_FAILURE = "stepFailure", + STEP_AWAITING = "stepAwaiting", COMPENSATE_STEP_SUCCESS = "compensateStepSuccess", COMPENSATE_STEP_FAILURE = "compensateStepFailure", } @@ -166,6 +167,11 @@ export type DistributedTransactionEvents = { transaction: DistributedTransaction }) => void + onStepAwaiting?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void + onCompensateBegin?: (args: { transaction: DistributedTransaction }) => void onCompensateStepSuccess?: (args: { diff --git a/packages/orchestration/src/workflow/global-workflow.ts b/packages/orchestration/src/workflow/global-workflow.ts index dd20ad8376..d806d4fa07 100644 --- a/packages/orchestration/src/workflow/global-workflow.ts +++ b/packages/orchestration/src/workflow/global-workflow.ts @@ -70,6 +70,10 @@ export class GlobalWorkflow extends WorkflowManager { transaction.once("stepFailure", this.subscribe.onStepFailure) } + if (this.subscribe.onStepAwaiting) { + transaction.once("stepAwaiting", this.subscribe.onStepAwaiting) + } + await orchestrator.resume(transaction) return transaction diff --git a/packages/orchestration/src/workflow/local-workflow.ts b/packages/orchestration/src/workflow/local-workflow.ts index a9a58691eb..8b7f3c03bc 100644 --- a/packages/orchestration/src/workflow/local-workflow.ts +++ b/packages/orchestration/src/workflow/local-workflow.ts @@ -244,6 +244,13 @@ export class LocalWorkflow { ) } + if (subscribe?.onStepAwaiting) { + transaction.on( + DistributedTransactionEvent.STEP_AWAITING, + eventWrapperMap.get("onStepAwaiting") + ) + } + if (subscribe?.onCompensateStepSuccess) { transaction.on( DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS, diff --git a/packages/types/src/common/config-module.ts b/packages/types/src/common/config-module.ts index 7964e1cfbb..d8a42eb34b 100644 --- a/packages/types/src/common/config-module.ts +++ b/packages/types/src/common/config-module.ts @@ -3,8 +3,8 @@ import { InternalModuleDeclaration, } from "../modules-sdk" -import { LoggerOptions } from "typeorm" import { RedisOptions } from "ioredis" +import { LoggerOptions } from "typeorm" /** * @interface @@ -744,3 +744,11 @@ export type ConfigModule = { */ featureFlags: Record } + +export type PluginDetails = { + resolve: string + name: string + id: string + options: Record + version: string +} diff --git a/packages/types/src/modules-sdk/index.ts b/packages/types/src/modules-sdk/index.ts index 17e1e1b461..0f372a3b44 100644 --- a/packages/types/src/modules-sdk/index.ts +++ b/packages/types/src/modules-sdk/index.ts @@ -297,5 +297,6 @@ export interface IModuleService { __hooks?: { onApplicationStart?: () => Promise onApplicationShutdown?: () => Promise + onApplicationPrepareShutdown?: () => Promise } } diff --git a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 87dcd93e7c..de1b6f2a7c 100644 --- a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -483,11 +483,20 @@ export class WorkflowOrchestratorService { notify({ eventType: "onStepFailure", step, errors }) }, + onStepAwaiting: ({ step, transaction }) => { + customEventHandlers?.onStepAwaiting?.({ step, transaction }) + + notify({ eventType: "onStepAwaiting", step }) + }, onCompensateStepSuccess: ({ step, transaction }) => { const stepName = step.definition.action! const response = transaction.getContext().compensate[stepName] - customEventHandlers?.onStepSuccess?.({ step, transaction, response }) + customEventHandlers?.onCompensateStepSuccess?.({ + step, + transaction, + response, + }) notify({ eventType: "onCompensateStepSuccess", step, response }) }, diff --git a/packages/workflow-engine-redis/package.json b/packages/workflow-engine-redis/package.json index 4e6c27f262..dea00d9a19 100644 --- a/packages/workflow-engine-redis/package.json +++ b/packages/workflow-engine-redis/package.json @@ -53,7 +53,7 @@ "@mikro-orm/migrations": "5.9.7", "@mikro-orm/postgresql": "5.9.7", "awilix": "^8.0.0", - "bullmq": "^5.1.3", + "bullmq": "^5.4.2", "dotenv": "^16.4.5", "ioredis": "^5.3.2", "knex": "2.4.2" diff --git a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts index e4a7fdcdb1..7f6484394f 100644 --- a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -4,8 +4,13 @@ import { TransactionHandlerType, TransactionStep, } from "@medusajs/orchestration" -import { ContainerLike, Context, MedusaContainer } from "@medusajs/types" -import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils" +import { + ContainerLike, + Context, + Logger, + MedusaContainer, +} from "@medusajs/types" +import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" import { FlowRunOptions, MedusaWorkflow, @@ -75,6 +80,8 @@ export class WorkflowOrchestratorService { protected redisPublisher: Redis protected redisSubscriber: Redis private subscribers: Subscribers = new Map() + private activeStepsCount: number = 0 + private logger: Logger protected redisDistributedTransactionStorage_: RedisDistributedTransactionStorage @@ -83,15 +90,18 @@ export class WorkflowOrchestratorService { redisDistributedTransactionStorage, redisPublisher, redisSubscriber, + logger, }: { dataLoaderOnly: boolean redisDistributedTransactionStorage: RedisDistributedTransactionStorage workflowOrchestratorService: WorkflowOrchestratorService redisPublisher: Redis redisSubscriber: Redis + logger: Logger }) { this.redisPublisher = redisPublisher this.redisSubscriber = redisSubscriber + this.logger = logger redisDistributedTransactionStorage.setWorkflowOrchestratorService(this) @@ -113,6 +123,15 @@ export class WorkflowOrchestratorService { await this.redisDistributedTransactionStorage_.onApplicationShutdown() } + async onApplicationPrepareShutdown() { + // eslint-disable-next-line max-len + await this.redisDistributedTransactionStorage_.onApplicationPrepareShutdown() + + while (this.activeStepsCount > 0) { + await new Promise((resolve) => setTimeout(resolve, 1000)) + } + } + @InjectSharedContext() async run( workflowIdOrWorkflow: string | ReturnWorkflow, @@ -523,6 +542,7 @@ export class WorkflowOrchestratorService { onStepBegin: async ({ step, transaction }) => { customEventHandlers?.onStepBegin?.({ step, transaction }) + this.activeStepsCount++ await notify({ eventType: "onStepBegin", step }) }, @@ -533,8 +553,9 @@ export class WorkflowOrchestratorService { transaction ) customEventHandlers?.onStepSuccess?.({ step, transaction, response }) - await notify({ eventType: "onStepSuccess", step, response }) + + this.activeStepsCount-- }, onStepFailure: async ({ step, transaction }) => { const stepName = step.definition.action! @@ -543,14 +564,28 @@ export class WorkflowOrchestratorService { .filter((err) => err.action === stepName) customEventHandlers?.onStepFailure?.({ step, transaction, errors }) - await notify({ eventType: "onStepFailure", step, errors }) + + this.activeStepsCount-- + }, + onStepAwaiting: async ({ step, transaction }) => { + customEventHandlers?.onStepAwaiting?.({ step, transaction }) + + await notify({ eventType: "onStepAwaiting", step }) + + if (!step.definition.backgroundExecution) { + this.activeStepsCount-- + } }, onCompensateStepSuccess: async ({ step, transaction }) => { const stepName = step.definition.action! const response = transaction.getContext().compensate[stepName] - customEventHandlers?.onStepSuccess?.({ step, transaction, response }) + customEventHandlers?.onCompensateStepSuccess?.({ + step, + transaction, + response, + }) await notify({ eventType: "onCompensateStepSuccess", step, response }) }, diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index c53542703e..6dfe4f0614 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -59,6 +59,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService { await this.workflowOrchestratorService_.onApplicationShutdown() await this.redisDisconnectHandler_() }, + onApplicationPrepareShutdown: async () => { + await this.workflowOrchestratorService_.onApplicationPrepareShutdown() + }, } @InjectManager("baseRepository_") diff --git a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index bd718f60ca..af19cd5904 100644 --- a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -64,9 +64,13 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt ) } + async onApplicationPrepareShutdown() { + // Close worker gracefully, i.e. wait for the current jobs to finish + await this.worker.close() + } + async onApplicationShutdown() { await this.queue.close() - await this.worker.close(true) } setWorkflowOrchestratorService(workflowOrchestratorService) { diff --git a/yarn.lock b/yarn.lock index cbdc7bdacd..d9b1716a2b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9221,7 +9221,7 @@ __metadata: "@mikro-orm/migrations": 5.9.7 "@mikro-orm/postgresql": 5.9.7 awilix: ^8.0.0 - bullmq: ^5.1.3 + bullmq: ^5.4.2 cross-env: ^5.2.1 dotenv: ^16.4.5 ioredis: ^5.3.2 @@ -22504,36 +22504,18 @@ __metadata: languageName: node linkType: hard -"bullmq@npm:^5.1.3": - version: 5.1.3 - resolution: "bullmq@npm:5.1.3" - dependencies: - cron-parser: ^4.6.0 - glob: ^8.0.3 - ioredis: ^5.3.2 - lodash: ^4.17.21 - msgpackr: ^1.10.1 - node-abort-controller: ^3.1.1 - semver: ^7.5.4 - tslib: ^2.0.0 - uuid: ^9.0.0 - checksum: dc2177dfd736b2d008ccab1ba9f77f80cc730ce6197c9ffa0f37327e1cf34bd8b97d83ee9f9008253ef0c0854bbd04f8c925889a3370a0899e8f5c7a34fd3ab3 - languageName: node - linkType: hard - "bullmq@npm:^5.4.2": - version: 5.4.2 - resolution: "bullmq@npm:5.4.2" + version: 5.7.3 + resolution: "bullmq@npm:5.7.3" dependencies: cron-parser: ^4.6.0 ioredis: ^5.3.2 - lodash: ^4.17.21 msgpackr: ^1.10.1 node-abort-controller: ^3.1.1 semver: ^7.5.4 tslib: ^2.0.0 uuid: ^9.0.0 - checksum: 01687a41bbacb646ab9cac181181c0cf18362f23b250c332125e0a701dacefb48ff20b030e095f24bd5e471f29d514bcc02779fd1f1c3cb830a3a5f1d0a39d75 + checksum: eade5853736a9ad606fe6f7b0d000585d7122ca3f5e4b71965cef448b77f3b8748f90ea420b90cf8e2fd7db737197946cc57ffbdb7ea8e8e4945875cc54b74d5 languageName: node linkType: hard