From 0eb68541b82f9af8677b6ed262fec53b15686199 Mon Sep 17 00:00:00 2001 From: Oli Juhl <59018053+olivermrbl@users.noreply.github.com> Date: Mon, 22 Apr 2024 14:32:50 +0200 Subject: [PATCH] fix: Resume workflow execution (#7103) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Supercedes #7051 – if OK, I'll move the base of this PR to `develop` and we can run reviews only of this one. **What** - Gracefully close BullMQ Worker in Redis Event Bus and Redis Workflow Engine (by @edast, @sradevski) - Register workflows before MedusaApp is loaded* - Introduce `onApplicationPrepareShutdown`** - Refactor plugin resolving for reusability purposes *We now register workflows before modules are loaded to ensure modules can run workflows as part of bootstrapping. E.g. the Redis Workflow Engine resumes workflows when it starts, which has until this change failed, because the workflows were not registered yet. **We introduce a new hook to prepare resources for an application shutdown. E.g. closing the BullMQ worker as a preparatory step to closing the BullMQ queue. The worker will continue to process jobs while the queue is still open to receive new jobs (without processing them). Co-authored-by: Stevche Radevski <4820812+sradevski@users.noreply.github.com> Co-authored-by: Darius <618221+edast@users.noreply.github.com> Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com> --- .../workflows/update-shipping-options.ts | 12 +- .../src/services/event-bus-redis.ts | 5 +- packages/medusa/src/commands/start-cluster.js | 12 +- packages/medusa/src/commands/start.js | 14 +- .../src/loaders/helpers/register-workflows.ts | 21 +++ .../src/loaders/helpers/resolve-plugins.ts | 151 ++++++++++++++++++ packages/medusa/src/loaders/index.ts | 39 ++++- .../src/loaders/load-medusa-project-apis.ts | 38 +---- packages/medusa/src/loaders/plugins.ts | 137 +--------------- .../src/loaders/utils/load-internal.ts | 1 + packages/modules-sdk/src/medusa-app.ts | 8 + packages/modules-sdk/src/medusa-module.ts | 17 ++ .../transaction/transaction-orchestrator.ts | 3 + .../orchestration/src/transaction/types.ts | 6 + .../src/workflow/global-workflow.ts | 4 + .../src/workflow/local-workflow.ts | 7 + packages/types/src/common/config-module.ts | 10 +- packages/types/src/modules-sdk/index.ts | 1 + .../src/services/workflow-orchestrator.ts | 11 +- packages/workflow-engine-redis/package.json | 2 +- .../src/services/workflow-orchestrator.ts | 45 +++++- .../src/services/workflows-module.ts | 3 + .../utils/workflow-orchestrator-storage.ts | 6 +- yarn.lock | 26 +-- 24 files changed, 351 insertions(+), 228 deletions(-) create mode 100644 packages/medusa/src/loaders/helpers/register-workflows.ts create mode 100644 packages/medusa/src/loaders/helpers/resolve-plugins.ts diff --git a/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts b/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts index 36768de3a6..0aad3b7212 100644 --- a/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts +++ b/integration-tests/modules/__tests__/shipping-options/workflows/update-shipping-options.ts @@ -1,3 +1,7 @@ +import { + createShippingOptionsWorkflow, + updateShippingOptionsWorkflow, +} from "@medusajs/core-flows" import { ModuleRegistrationName } from "@medusajs/modules-sdk" import { FulfillmentSetDTO, @@ -8,16 +12,12 @@ import { ShippingProfileDTO, UpdateShippingOptionsWorkflowInput, } from "@medusajs/types" -import { medusaIntegrationTestRunner } from "medusa-test-utils/dist" -import { - createShippingOptionsWorkflow, - updateShippingOptionsWorkflow, -} from "@medusajs/core-flows" import { ContainerRegistrationKeys, - remoteQueryObjectFromString, RuleOperator, + remoteQueryObjectFromString, } from "@medusajs/utils" +import { medusaIntegrationTestRunner } from "medusa-test-utils/dist" jest.setTimeout(100000) diff --git a/packages/event-bus-redis/src/services/event-bus-redis.ts b/packages/event-bus-redis/src/services/event-bus-redis.ts index 628ada4933..a7b92093ba 100644 --- a/packages/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/event-bus-redis/src/services/event-bus-redis.ts @@ -62,10 +62,13 @@ export default class RedisEventBusService extends AbstractEventBusModuleService __hooks = { onApplicationShutdown: async () => { - await this.bullWorker_?.close(true) await this.queue_.close() + // eslint-disable-next-line max-len this.eventBusRedisConnection_.disconnect() }, + onApplicationPrepareShutdown: async () => { + await this.bullWorker_?.close() + }, } /** diff --git a/packages/medusa/src/commands/start-cluster.js b/packages/medusa/src/commands/start-cluster.js index efd9026257..9a2253b34e 100644 --- a/packages/medusa/src/commands/start-cluster.js +++ b/packages/medusa/src/commands/start-cluster.js @@ -54,7 +54,7 @@ export default async function ({ port, cpus, directory }) { const app = express() - const { dbConnection, shutdown } = await loaders({ + const { dbConnection, shutdown, prepareShutdown } = await loaders({ directory, expressApp: app, }) @@ -72,10 +72,12 @@ export default async function ({ port, cpus, directory }) { const gracefulShutDown = () => { server .shutdown() - .then(() => { - shutdown().then(() => { - process.exit(0) - }) + .then(async () => { + return await prepareShutdown() + }) + .then(async () => { + await shutdown() + process.exit(0) }) .catch((e) => { process.exit(1) diff --git a/packages/medusa/src/commands/start.js b/packages/medusa/src/commands/start.js index e3bbda43cc..fb735f45a6 100644 --- a/packages/medusa/src/commands/start.js +++ b/packages/medusa/src/commands/start.js @@ -19,7 +19,7 @@ export default async function ({ port, directory }) { const app = express() try { - const { dbConnection, shutdown } = await loaders({ + const { dbConnection, shutdown, prepareShutdown } = await loaders({ directory, expressApp: app, }) @@ -37,13 +37,15 @@ export default async function ({ port, directory }) { // Handle graceful shutdown const gracefulShutDown = () => { + Logger.info("Gracefully shutting down server") server .shutdown() - .then(() => { - Logger.info("Gracefully stopping the server.") - shutdown().then(() => { - process.exit(0) - }) + .then(async () => { + return await prepareShutdown() + }) + .then(async () => { + await shutdown() + process.exit(0) }) .catch((e) => { Logger.error("Error received when shutting down the server.", e) diff --git a/packages/medusa/src/loaders/helpers/register-workflows.ts b/packages/medusa/src/loaders/helpers/register-workflows.ts new file mode 100644 index 0000000000..0ddea527ab --- /dev/null +++ b/packages/medusa/src/loaders/helpers/register-workflows.ts @@ -0,0 +1,21 @@ +import { PluginDetails } from "@medusajs/types" +import { glob } from "glob" +import { getResolvedPlugins } from "./resolve-plugins" + +/** + * import files from the workflows directory to run the registration of the wofklows + * @param pluginDetails + */ +async function registerWorkflows(pluginDetails: PluginDetails): Promise { + const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {}) + await Promise.all(files.map(async (file) => import(file))) +} + +export async function registerProjectWorkflows({ + rootDirectory, + configModule, +}) { + const [resolved] = + getResolvedPlugins(rootDirectory, configModule, "dist", true) || [] + await registerWorkflows(resolved) +} diff --git a/packages/medusa/src/loaders/helpers/resolve-plugins.ts b/packages/medusa/src/loaders/helpers/resolve-plugins.ts new file mode 100644 index 0000000000..2fe17ff9ac --- /dev/null +++ b/packages/medusa/src/loaders/helpers/resolve-plugins.ts @@ -0,0 +1,151 @@ +import { ConfigModule, PluginDetails } from "@medusajs/types" +import { isString } from "@medusajs/utils" +import fs from "fs" +import { sync as existsSync } from "fs-exists-cached" +import { createRequireFromPath } from "medusa-core-utils" +import path from "path" +import { MEDUSA_PROJECT_NAME } from "../plugins" + +function createPluginId(name: string): string { + return name +} + +function createFileContentHash(path, files): string { + return path + files +} + +/** + * Finds the correct path for the plugin. If it is a local plugin it will be + * found in the plugins folder. Otherwise we will look for the plugin in the + * installed npm packages. + * @param {string} pluginName - the name of the plugin to find. Should match + * the name of the folder where the plugin is contained. + * @return {object} the plugin details + */ +function resolvePlugin(pluginName: string): { + resolve: string + id: string + name: string + options: Record + version: string +} { + // Only find plugins when we're not given an absolute path + if (!existsSync(pluginName)) { + // Find the plugin in the local plugins folder + const resolvedPath = path.resolve(`./plugins/${pluginName}`) + + if (existsSync(resolvedPath)) { + if (existsSync(`${resolvedPath}/package.json`)) { + const packageJSON = JSON.parse( + fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) + ) + const name = packageJSON.name || pluginName + // warnOnIncompatiblePeerDependency(name, packageJSON) + + return { + resolve: resolvedPath, + name, + id: createPluginId(name), + options: {}, + version: + packageJSON.version || createFileContentHash(resolvedPath, `**`), + } + } else { + // Make package.json a requirement for local plugins too + throw new Error(`Plugin ${pluginName} requires a package.json file`) + } + } + } + + const rootDir = path.resolve(".") + + /** + * Here we have an absolute path to an internal plugin, or a name of a module + * which should be located in node_modules. + */ + try { + const requireSource = + rootDir !== null + ? createRequireFromPath(`${rootDir}/:internal:`) + : require + + // If the path is absolute, resolve the directory of the internal plugin, + // otherwise resolve the directory containing the package.json + const resolvedPath = path.dirname( + requireSource.resolve(`${pluginName}/package.json`) + ) + + const packageJSON = JSON.parse( + fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) + ) + // warnOnIncompatiblePeerDependency(packageJSON.name, packageJSON) + + const computedResolvedPath = + resolvedPath + (process.env.DEV_MODE ? "/src" : "") + + // Add support for a plugin to output the build into a dist directory + const resolvedPathToDist = resolvedPath + "/dist" + const isDistExist = + resolvedPathToDist && + !process.env.DEV_MODE && + existsSync(resolvedPath + "/dist") + + return { + resolve: isDistExist ? resolvedPathToDist : computedResolvedPath, + id: createPluginId(packageJSON.name), + name: packageJSON.name, + options: {}, + version: packageJSON.version, + } + } catch (err) { + throw new Error( + `Unable to find plugin "${pluginName}". Perhaps you need to install its package?` + ) + } +} + +export function getResolvedPlugins( + rootDirectory: string, + configModule: ConfigModule, + extensionDirectoryPath = "dist", + isMedusaProject = false +): undefined | PluginDetails[] { + const { plugins } = configModule + + if (isMedusaProject) { + const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) + + return [ + { + resolve: extensionDirectory, + name: MEDUSA_PROJECT_NAME, + id: createPluginId(MEDUSA_PROJECT_NAME), + options: configModule, + version: createFileContentHash(process.cwd(), `**`), + }, + ] + } + + const resolved = plugins.map((plugin) => { + if (isString(plugin)) { + return resolvePlugin(plugin) + } + + const details = resolvePlugin(plugin.resolve) + details.options = plugin.options + + return details + }) + + const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) + // Resolve user's project as a plugin for loading purposes + resolved.push({ + resolve: extensionDirectory, + name: MEDUSA_PROJECT_NAME, + id: createPluginId(MEDUSA_PROJECT_NAME), + options: configModule, + version: createFileContentHash(process.cwd(), `**`), + }) + + return resolved +} diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index e7a0f0779b..305e39cb20 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -6,8 +6,9 @@ import { import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types" import { ContainerRegistrationKeys, + MedusaV2Flag, isString, - MedusaV2Flag, promiseAll, + promiseAll, } from "@medusajs/utils" import { asValue } from "awilix" import { Express, NextFunction, Request, Response } from "express" @@ -24,6 +25,7 @@ import databaseLoader, { dataSource } from "./database" import defaultsLoader from "./defaults" import expressLoader from "./express" import featureFlagsLoader from "./feature-flags" +import { registerProjectWorkflows } from "./helpers/register-workflows" import medusaProjectApisLoader from "./load-medusa-project-apis" import Logger from "./logger" import loadMedusaApp, { mergeDefaultModules } from "./medusa-app" @@ -102,7 +104,14 @@ async function loadMedusaV2({ [ContainerRegistrationKeys.REMOTE_QUERY]: asValue(null), }) - const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({ + // Workflows are registered before the app to allow modules to run workflows as part of bootstrapping + // e.g. the workflow engine will resume workflows that were running when the server was shut down + await registerProjectWorkflows({ rootDirectory, configModule }) + + const { + onApplicationShutdown: medusaAppOnApplicationShutdown, + onApplicationPrepareShutdown: medusaAppOnApplicationPrepareShutdown, + } = await loadMedusaApp({ configModule, container, }) @@ -149,11 +158,13 @@ async function loadMedusaV2({ await createDefaultsWorkflow(container).run() const shutdown = async () => { + await medusaAppOnApplicationShutdown() + await promiseAll([ container.dispose(), pgConnection?.context?.destroy(), expressShutdown(), - medusaAppOnApplicationShutdown() + medusaAppOnApplicationShutdown(), ]) } @@ -163,6 +174,7 @@ async function loadMedusaV2({ app: expressApp, pgConnection, shutdown, + prepareShutdown: medusaAppOnApplicationPrepareShutdown, } } @@ -177,6 +189,7 @@ export default async ({ app: Express pgConnection: unknown shutdown: () => Promise + prepareShutdown: () => Promise }> => { const configModule = loadConfig(rootDirectory) const featureFlagRouter = featureFlagsLoader(configModule, Logger) @@ -211,7 +224,11 @@ export default async ({ featureFlagRouter: asValue(featureFlagRouter), }) - const { shutdown: redisShutdown } = await redisLoader({ container, configModule, logger: Logger }) + const { shutdown: redisShutdown } = await redisLoader({ + container, + configModule, + logger: Logger, + }) const modelsActivity = Logger.activity(`Initializing models${EOL}`) track("MODELS_INIT_STARTED") @@ -271,7 +288,10 @@ export default async ({ track("MODULES_INIT_STARTED") // Move before services init once all modules are migrated and do not rely on core resources anymore - const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({ + const { + onApplicationShutdown: medusaAppOnApplicationShutdown, + onApplicationPrepareShutdown: medusaAppOnApplicationPrepareShutdown, + } = await loadMedusaApp({ configModule, container, }) @@ -281,7 +301,10 @@ export default async ({ const expActivity = Logger.activity(`Initializing express${EOL}`) track("EXPRESS_INIT_STARTED") - const { shutdown: expressShutdown } = await expressLoader({ app: expressApp, configModule }) + const { shutdown: expressShutdown } = await expressLoader({ + app: expressApp, + configModule, + }) await passportLoader({ app: expressApp, configModule }) const exAct = Logger.success(expActivity, "Express intialized") || {} track("EXPRESS_INIT_COMPLETED", { duration: exAct.duration }) @@ -339,13 +362,14 @@ export default async ({ track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration }) async function shutdown() { + await medusaAppOnApplicationShutdown() + await promiseAll([ container.dispose(), dbConnection?.destroy(), pgConnection?.context?.destroy(), redisShutdown(), expressShutdown(), - medusaAppOnApplicationShutdown(), ]) } @@ -356,5 +380,6 @@ export default async ({ app: expressApp, pgConnection, shutdown, + prepareShutdown: async () => medusaAppOnApplicationPrepareShutdown(), } } diff --git a/packages/medusa/src/loaders/load-medusa-project-apis.ts b/packages/medusa/src/loaders/load-medusa-project-apis.ts index 8dbe27acef..d71327b4d4 100644 --- a/packages/medusa/src/loaders/load-medusa-project-apis.ts +++ b/packages/medusa/src/loaders/load-medusa-project-apis.ts @@ -1,12 +1,12 @@ import { promiseAll } from "@medusajs/utils" import { Express } from "express" import glob from "glob" -import _ from "lodash" import { trackInstallation } from "medusa-telemetry" import { EOL } from "os" import path from "path" import { ConfigModule, Logger, MedusaContainer } from "../types/global" import ScheduledJobsLoader from "./helpers/jobs" +import { getResolvedPlugins } from "./helpers/resolve-plugins" import { RoutesLoader } from "./helpers/routing" import { SubscriberLoader } from "./helpers/subscribers" import logger from "./logger" @@ -55,7 +55,6 @@ export default async ({ ) } await registerSubscribers(pluginDetails, container, activityId) - await registerWorkflows(pluginDetails) }) ) @@ -78,23 +77,6 @@ export default async ({ resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin")) } -function getResolvedPlugins( - rootDirectory: string, - configModule: ConfigModule, - extensionDirectoryPath = "dist" -): undefined | PluginDetails[] { - const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) - return [ - { - resolve: extensionDirectory, - name: MEDUSA_PROJECT_NAME, - id: createPluginId(MEDUSA_PROJECT_NAME), - options: configModule, - version: createFileContentHash(process.cwd(), `**`), - }, - ] -} - async function runLoaders( pluginDetails: PluginDetails, container: MedusaContainer @@ -191,21 +173,3 @@ async function registerSubscribers( true ).load() } - -/** - * import files from the workflows directory to run the registration of the wofklows - * @param pluginDetails - */ -async function registerWorkflows(pluginDetails: PluginDetails): Promise { - const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {}) - await Promise.all(files.map(async (file) => import(file))) -} - -// TODO: Create unique id for each plugin -function createPluginId(name: string): string { - return name -} - -function createFileContentHash(path, files): string { - return path + files -} diff --git a/packages/medusa/src/loaders/plugins.ts b/packages/medusa/src/loaders/plugins.ts index 97e2399837..f9afbaa893 100644 --- a/packages/medusa/src/loaders/plugins.ts +++ b/packages/medusa/src/loaders/plugins.ts @@ -3,13 +3,9 @@ import { promiseAll, upperCaseFirst, } from "@medusajs/utils" -import { aliasTo, asFunction, asValue, Lifetime } from "awilix" +import { Lifetime, aliasTo, asFunction, asValue } from "awilix" import { Express } from "express" -import fs from "fs" -import { sync as existsSync } from "fs-exists-cached" import glob from "glob" -import _ from "lodash" -import { createRequireFromPath } from "medusa-core-utils" import { OauthService } from "medusa-interfaces" import { trackInstallation } from "medusa-telemetry" import { EOL } from "os" @@ -41,6 +37,7 @@ import { registerAbstractFulfillmentServiceFromClass, registerPaymentProcessorFromClass, } from "./helpers/plugins" +import { getResolvedPlugins } from "./helpers/resolve-plugins" import { RoutesLoader } from "./helpers/routing" import { SubscriberLoader } from "./helpers/subscribers" import logger from "./logger" @@ -121,37 +118,6 @@ export default async ({ resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin")) } -function getResolvedPlugins( - rootDirectory: string, - configModule: ConfigModule, - extensionDirectoryPath = "dist" -): undefined | PluginDetails[] { - const { plugins } = configModule - - const resolved = plugins.map((plugin) => { - if (_.isString(plugin)) { - return resolvePlugin(plugin) - } - - const details = resolvePlugin(plugin.resolve) - details.options = plugin.options - - return details - }) - - const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath) - // Resolve user's project as a plugin for loading purposes - resolved.push({ - resolve: extensionDirectory, - name: MEDUSA_PROJECT_NAME, - id: createPluginId(MEDUSA_PROJECT_NAME), - options: configModule, - version: createFileContentHash(process.cwd(), `**`), - }) - - return resolved -} - export async function registerPluginModels({ rootDirectory, container, @@ -753,102 +719,3 @@ async function runSetupFunctions(pluginDetails: PluginDetails): Promise { }) ) } - -// TODO: Create unique id for each plugin -function createPluginId(name: string): string { - return name -} - -/** - * Finds the correct path for the plugin. If it is a local plugin it will be - * found in the plugins folder. Otherwise we will look for the plugin in the - * installed npm packages. - * @param {string} pluginName - the name of the plugin to find. Should match - * the name of the folder where the plugin is contained. - * @return {object} the plugin details - */ -function resolvePlugin(pluginName: string): { - resolve: string - id: string - name: string - options: Record - version: string -} { - // Only find plugins when we're not given an absolute path - if (!existsSync(pluginName)) { - // Find the plugin in the local plugins folder - const resolvedPath = path.resolve(`./plugins/${pluginName}`) - - if (existsSync(resolvedPath)) { - if (existsSync(`${resolvedPath}/package.json`)) { - const packageJSON = JSON.parse( - fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) - ) - const name = packageJSON.name || pluginName - // warnOnIncompatiblePeerDependency(name, packageJSON) - - return { - resolve: resolvedPath, - name, - id: createPluginId(name), - options: {}, - version: - packageJSON.version || createFileContentHash(resolvedPath, `**`), - } - } else { - // Make package.json a requirement for local plugins too - throw new Error(`Plugin ${pluginName} requires a package.json file`) - } - } - } - - const rootDir = path.resolve(".") - - /** - * Here we have an absolute path to an internal plugin, or a name of a module - * which should be located in node_modules. - */ - try { - const requireSource = - rootDir !== null - ? createRequireFromPath(`${rootDir}/:internal:`) - : require - - // If the path is absolute, resolve the directory of the internal plugin, - // otherwise resolve the directory containing the package.json - const resolvedPath = path.dirname( - requireSource.resolve(`${pluginName}/package.json`) - ) - - const packageJSON = JSON.parse( - fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`) - ) - // warnOnIncompatiblePeerDependency(packageJSON.name, packageJSON) - - const computedResolvedPath = - resolvedPath + (process.env.DEV_MODE ? "/src" : "") - - // Add support for a plugin to output the build into a dist directory - const resolvedPathToDist = resolvedPath + "/dist" - const isDistExist = - resolvedPathToDist && - !process.env.DEV_MODE && - existsSync(resolvedPath + "/dist") - - return { - resolve: isDistExist ? resolvedPathToDist : computedResolvedPath, - id: createPluginId(packageJSON.name), - name: packageJSON.name, - options: {}, - version: packageJSON.version, - } - } catch (err) { - throw new Error( - `Unable to find plugin "${pluginName}". Perhaps you need to install its package?` - ) - } -} - -function createFileContentHash(path, files): string { - return path + files -} diff --git a/packages/modules-sdk/src/loaders/utils/load-internal.ts b/packages/modules-sdk/src/loaders/utils/load-internal.ts index bd6fb6abb2..dfe97b6bf9 100644 --- a/packages/modules-sdk/src/loaders/utils/load-internal.ts +++ b/packages/modules-sdk/src/loaders/utils/load-internal.ts @@ -141,6 +141,7 @@ export async function loadInternalModule( if (loaderOnly) { // The expectation is only to run the loader as standalone, so we do not need to register the service and we need to cleanup all services const service = container.resolve(registrationName) + await service.__hooks?.onApplicationPrepareShutdown() await service.__hooks?.onApplicationShutdown() } } diff --git a/packages/modules-sdk/src/medusa-app.ts b/packages/modules-sdk/src/medusa-app.ts index 80baeb7b32..9972dabcf6 100644 --- a/packages/modules-sdk/src/medusa-app.ts +++ b/packages/modules-sdk/src/medusa-app.ts @@ -204,6 +204,7 @@ export type MedusaAppOutput = { notFound?: Record> runMigrations: RunMigrationFn onApplicationShutdown: () => Promise + onApplicationPrepareShutdown: () => Promise } export type MedusaAppOptions = { @@ -243,6 +244,7 @@ async function MedusaApp_({ migrationOnly?: boolean } = {}): Promise { const sharedContainer_ = createMedusaContainer({}, sharedContainer) + const onApplicationShutdown = async () => { await promiseAll([ MedusaModule.onApplicationShutdown(), @@ -250,6 +252,10 @@ async function MedusaApp_({ ]) } + const onApplicationPrepareShutdown = async () => { + await promiseAll([MedusaModule.onApplicationPrepareShutdown()]) + } + const modules: MedusaModuleConfig = modulesConfig ?? ( @@ -312,6 +318,7 @@ async function MedusaApp_({ if (loaderOnly) { return { onApplicationShutdown, + onApplicationPrepareShutdown, modules: allModules, link: undefined, query: async () => { @@ -388,6 +395,7 @@ async function MedusaApp_({ return { onApplicationShutdown, + onApplicationPrepareShutdown, modules: allModules, link: remoteLink, query, diff --git a/packages/modules-sdk/src/medusa-module.ts b/packages/modules-sdk/src/medusa-module.ts index 2c03d1b43d..f3b4275449 100644 --- a/packages/modules-sdk/src/medusa-module.ts +++ b/packages/modules-sdk/src/medusa-module.ts @@ -136,6 +136,23 @@ export class MedusaModule { ) } + public static async onApplicationPrepareShutdown(): Promise { + await promiseAll( + [...MedusaModule.instances_.values()] + .map((instances) => { + return Object.values(instances).map((instance: IModuleService) => { + return instance.__hooks?.onApplicationPrepareShutdown + ?.bind(instance)() + .catch(() => { + // The module should handle this and log it + return void 0 + }) + }) + }) + .flat() + ) + } + public static clearInstances(): void { MedusaModule.instances_.clear() MedusaModule.modules_.clear() diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 0c31b17c37..a327734aeb 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -752,6 +752,9 @@ export class TransactionOrchestrator extends EventEmitter { await setStepFailure(error) }) + + const eventName = DistributedTransactionEvent.STEP_AWAITING + transaction.emit(eventName, { step, transaction }) }) ) } diff --git a/packages/orchestration/src/transaction/types.ts b/packages/orchestration/src/transaction/types.ts index 1ab854c72d..f113e447df 100644 --- a/packages/orchestration/src/transaction/types.ts +++ b/packages/orchestration/src/transaction/types.ts @@ -137,6 +137,7 @@ export enum DistributedTransactionEvent { STEP_BEGIN = "stepBegin", STEP_SUCCESS = "stepSuccess", STEP_FAILURE = "stepFailure", + STEP_AWAITING = "stepAwaiting", COMPENSATE_STEP_SUCCESS = "compensateStepSuccess", COMPENSATE_STEP_FAILURE = "compensateStepFailure", } @@ -166,6 +167,11 @@ export type DistributedTransactionEvents = { transaction: DistributedTransaction }) => void + onStepAwaiting?: (args: { + step: TransactionStep + transaction: DistributedTransaction + }) => void + onCompensateBegin?: (args: { transaction: DistributedTransaction }) => void onCompensateStepSuccess?: (args: { diff --git a/packages/orchestration/src/workflow/global-workflow.ts b/packages/orchestration/src/workflow/global-workflow.ts index dd20ad8376..d806d4fa07 100644 --- a/packages/orchestration/src/workflow/global-workflow.ts +++ b/packages/orchestration/src/workflow/global-workflow.ts @@ -70,6 +70,10 @@ export class GlobalWorkflow extends WorkflowManager { transaction.once("stepFailure", this.subscribe.onStepFailure) } + if (this.subscribe.onStepAwaiting) { + transaction.once("stepAwaiting", this.subscribe.onStepAwaiting) + } + await orchestrator.resume(transaction) return transaction diff --git a/packages/orchestration/src/workflow/local-workflow.ts b/packages/orchestration/src/workflow/local-workflow.ts index a9a58691eb..8b7f3c03bc 100644 --- a/packages/orchestration/src/workflow/local-workflow.ts +++ b/packages/orchestration/src/workflow/local-workflow.ts @@ -244,6 +244,13 @@ export class LocalWorkflow { ) } + if (subscribe?.onStepAwaiting) { + transaction.on( + DistributedTransactionEvent.STEP_AWAITING, + eventWrapperMap.get("onStepAwaiting") + ) + } + if (subscribe?.onCompensateStepSuccess) { transaction.on( DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS, diff --git a/packages/types/src/common/config-module.ts b/packages/types/src/common/config-module.ts index 7964e1cfbb..d8a42eb34b 100644 --- a/packages/types/src/common/config-module.ts +++ b/packages/types/src/common/config-module.ts @@ -3,8 +3,8 @@ import { InternalModuleDeclaration, } from "../modules-sdk" -import { LoggerOptions } from "typeorm" import { RedisOptions } from "ioredis" +import { LoggerOptions } from "typeorm" /** * @interface @@ -744,3 +744,11 @@ export type ConfigModule = { */ featureFlags: Record } + +export type PluginDetails = { + resolve: string + name: string + id: string + options: Record + version: string +} diff --git a/packages/types/src/modules-sdk/index.ts b/packages/types/src/modules-sdk/index.ts index 17e1e1b461..0f372a3b44 100644 --- a/packages/types/src/modules-sdk/index.ts +++ b/packages/types/src/modules-sdk/index.ts @@ -297,5 +297,6 @@ export interface IModuleService { __hooks?: { onApplicationStart?: () => Promise onApplicationShutdown?: () => Promise + onApplicationPrepareShutdown?: () => Promise } } diff --git a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 87dcd93e7c..de1b6f2a7c 100644 --- a/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -483,11 +483,20 @@ export class WorkflowOrchestratorService { notify({ eventType: "onStepFailure", step, errors }) }, + onStepAwaiting: ({ step, transaction }) => { + customEventHandlers?.onStepAwaiting?.({ step, transaction }) + + notify({ eventType: "onStepAwaiting", step }) + }, onCompensateStepSuccess: ({ step, transaction }) => { const stepName = step.definition.action! const response = transaction.getContext().compensate[stepName] - customEventHandlers?.onStepSuccess?.({ step, transaction, response }) + customEventHandlers?.onCompensateStepSuccess?.({ + step, + transaction, + response, + }) notify({ eventType: "onCompensateStepSuccess", step, response }) }, diff --git a/packages/workflow-engine-redis/package.json b/packages/workflow-engine-redis/package.json index 4e6c27f262..dea00d9a19 100644 --- a/packages/workflow-engine-redis/package.json +++ b/packages/workflow-engine-redis/package.json @@ -53,7 +53,7 @@ "@mikro-orm/migrations": "5.9.7", "@mikro-orm/postgresql": "5.9.7", "awilix": "^8.0.0", - "bullmq": "^5.1.3", + "bullmq": "^5.4.2", "dotenv": "^16.4.5", "ioredis": "^5.3.2", "knex": "2.4.2" diff --git a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts index e4a7fdcdb1..7f6484394f 100644 --- a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -4,8 +4,13 @@ import { TransactionHandlerType, TransactionStep, } from "@medusajs/orchestration" -import { ContainerLike, Context, MedusaContainer } from "@medusajs/types" -import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils" +import { + ContainerLike, + Context, + Logger, + MedusaContainer, +} from "@medusajs/types" +import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" import { FlowRunOptions, MedusaWorkflow, @@ -75,6 +80,8 @@ export class WorkflowOrchestratorService { protected redisPublisher: Redis protected redisSubscriber: Redis private subscribers: Subscribers = new Map() + private activeStepsCount: number = 0 + private logger: Logger protected redisDistributedTransactionStorage_: RedisDistributedTransactionStorage @@ -83,15 +90,18 @@ export class WorkflowOrchestratorService { redisDistributedTransactionStorage, redisPublisher, redisSubscriber, + logger, }: { dataLoaderOnly: boolean redisDistributedTransactionStorage: RedisDistributedTransactionStorage workflowOrchestratorService: WorkflowOrchestratorService redisPublisher: Redis redisSubscriber: Redis + logger: Logger }) { this.redisPublisher = redisPublisher this.redisSubscriber = redisSubscriber + this.logger = logger redisDistributedTransactionStorage.setWorkflowOrchestratorService(this) @@ -113,6 +123,15 @@ export class WorkflowOrchestratorService { await this.redisDistributedTransactionStorage_.onApplicationShutdown() } + async onApplicationPrepareShutdown() { + // eslint-disable-next-line max-len + await this.redisDistributedTransactionStorage_.onApplicationPrepareShutdown() + + while (this.activeStepsCount > 0) { + await new Promise((resolve) => setTimeout(resolve, 1000)) + } + } + @InjectSharedContext() async run( workflowIdOrWorkflow: string | ReturnWorkflow, @@ -523,6 +542,7 @@ export class WorkflowOrchestratorService { onStepBegin: async ({ step, transaction }) => { customEventHandlers?.onStepBegin?.({ step, transaction }) + this.activeStepsCount++ await notify({ eventType: "onStepBegin", step }) }, @@ -533,8 +553,9 @@ export class WorkflowOrchestratorService { transaction ) customEventHandlers?.onStepSuccess?.({ step, transaction, response }) - await notify({ eventType: "onStepSuccess", step, response }) + + this.activeStepsCount-- }, onStepFailure: async ({ step, transaction }) => { const stepName = step.definition.action! @@ -543,14 +564,28 @@ export class WorkflowOrchestratorService { .filter((err) => err.action === stepName) customEventHandlers?.onStepFailure?.({ step, transaction, errors }) - await notify({ eventType: "onStepFailure", step, errors }) + + this.activeStepsCount-- + }, + onStepAwaiting: async ({ step, transaction }) => { + customEventHandlers?.onStepAwaiting?.({ step, transaction }) + + await notify({ eventType: "onStepAwaiting", step }) + + if (!step.definition.backgroundExecution) { + this.activeStepsCount-- + } }, onCompensateStepSuccess: async ({ step, transaction }) => { const stepName = step.definition.action! const response = transaction.getContext().compensate[stepName] - customEventHandlers?.onStepSuccess?.({ step, transaction, response }) + customEventHandlers?.onCompensateStepSuccess?.({ + step, + transaction, + response, + }) await notify({ eventType: "onCompensateStepSuccess", step, response }) }, diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index c53542703e..6dfe4f0614 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -59,6 +59,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService { await this.workflowOrchestratorService_.onApplicationShutdown() await this.redisDisconnectHandler_() }, + onApplicationPrepareShutdown: async () => { + await this.workflowOrchestratorService_.onApplicationPrepareShutdown() + }, } @InjectManager("baseRepository_") diff --git a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index bd718f60ca..af19cd5904 100644 --- a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -64,9 +64,13 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt ) } + async onApplicationPrepareShutdown() { + // Close worker gracefully, i.e. wait for the current jobs to finish + await this.worker.close() + } + async onApplicationShutdown() { await this.queue.close() - await this.worker.close(true) } setWorkflowOrchestratorService(workflowOrchestratorService) { diff --git a/yarn.lock b/yarn.lock index cbdc7bdacd..d9b1716a2b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9221,7 +9221,7 @@ __metadata: "@mikro-orm/migrations": 5.9.7 "@mikro-orm/postgresql": 5.9.7 awilix: ^8.0.0 - bullmq: ^5.1.3 + bullmq: ^5.4.2 cross-env: ^5.2.1 dotenv: ^16.4.5 ioredis: ^5.3.2 @@ -22504,36 +22504,18 @@ __metadata: languageName: node linkType: hard -"bullmq@npm:^5.1.3": - version: 5.1.3 - resolution: "bullmq@npm:5.1.3" - dependencies: - cron-parser: ^4.6.0 - glob: ^8.0.3 - ioredis: ^5.3.2 - lodash: ^4.17.21 - msgpackr: ^1.10.1 - node-abort-controller: ^3.1.1 - semver: ^7.5.4 - tslib: ^2.0.0 - uuid: ^9.0.0 - checksum: dc2177dfd736b2d008ccab1ba9f77f80cc730ce6197c9ffa0f37327e1cf34bd8b97d83ee9f9008253ef0c0854bbd04f8c925889a3370a0899e8f5c7a34fd3ab3 - languageName: node - linkType: hard - "bullmq@npm:^5.4.2": - version: 5.4.2 - resolution: "bullmq@npm:5.4.2" + version: 5.7.3 + resolution: "bullmq@npm:5.7.3" dependencies: cron-parser: ^4.6.0 ioredis: ^5.3.2 - lodash: ^4.17.21 msgpackr: ^1.10.1 node-abort-controller: ^3.1.1 semver: ^7.5.4 tslib: ^2.0.0 uuid: ^9.0.0 - checksum: 01687a41bbacb646ab9cac181181c0cf18362f23b250c332125e0a701dacefb48ff20b030e095f24bd5e471f29d514bcc02779fd1f1c3cb830a3a5f1d0a39d75 + checksum: eade5853736a9ad606fe6f7b0d000585d7122ca3f5e4b71965cef448b77f3b8748f90ea420b90cf8e2fd7db737197946cc57ffbdb7ea8e8e4945875cc54b74d5 languageName: node linkType: hard