From 56481e683d33ff98f0d4c4e144873bb23f993c9c Mon Sep 17 00:00:00 2001 From: Sebastian Rindom Date: Thu, 21 Mar 2024 14:08:20 +0100 Subject: [PATCH] feat: v2 - add worker mode (#6739) **What** - Adds support for starting a Medusa process with a worker mode. - The worker modes supported are "shared", "worker", "server" - In "worker" mode, API routes are not registered and modules that need to run workers (e.g., event bus redis) can use the flag to conditionally start workers. - In "server" mode, API routes are registered and workers are not started. - In "shared" mode, API routes are registered and workers are started. This is great for development. --- .changeset/ninety-months-allow.md | 8 +++ .../src/services/event-bus-redis.ts | 13 ++-- packages/medusa/src/commands/start.js | 63 +++++++++++-------- packages/medusa/src/loaders/config.ts | 15 ++++- packages/medusa/src/loaders/index.ts | 53 +++++++++------- .../src/loaders/load-medusa-project-apis.ts | 12 +++- packages/medusa/src/loaders/medusa-app.ts | 1 + packages/modules-sdk/src/medusa-app.ts | 9 ++- packages/modules-sdk/src/medusa-module.ts | 5 +- packages/types/src/common/config-module.ts | 19 ++++++ packages/types/src/modules-sdk/index.ts | 1 + 11 files changed, 141 insertions(+), 58 deletions(-) create mode 100644 .changeset/ninety-months-allow.md diff --git a/.changeset/ninety-months-allow.md b/.changeset/ninety-months-allow.md new file mode 100644 index 0000000000..c1445a3021 --- /dev/null +++ b/.changeset/ninety-months-allow.md @@ -0,0 +1,8 @@ +--- +"@medusajs/medusa": patch +"@medusajs/event-bus-redis": patch +"@medusajs/modules-sdk": patch +"@medusajs/types": patch +--- + +feat: v2 - add worker mode diff --git a/packages/event-bus-redis/src/services/event-bus-redis.ts b/packages/event-bus-redis/src/services/event-bus-redis.ts index f9ad078748..a3e98c9b94 100644 --- a/packages/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/event-bus-redis/src/services/event-bus-redis.ts @@ -42,11 +42,14 @@ export default class RedisEventBusService extends AbstractEventBusModuleService }) // Register our worker to handle emit calls - new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, { - prefix: `${this.constructor.name}`, - ...(moduleOptions.workerOptions ?? {}), - connection: eventBusRedisConnection, - }) + const shouldStartWorker = moduleDeclaration.worker_mode !== "server" + if (shouldStartWorker) { + new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, { + prefix: `${this.constructor.name}`, + ...(moduleOptions.workerOptions ?? {}), + connection: eventBusRedisConnection, + }) + } } /** diff --git a/packages/medusa/src/commands/start.js b/packages/medusa/src/commands/start.js index 3f2161e803..41e70fc626 100644 --- a/packages/medusa/src/commands/start.js +++ b/packages/medusa/src/commands/start.js @@ -19,39 +19,52 @@ export default async function ({ port, directory }) { const app = express() try { - const { dbConnection } = await loaders({ directory, expressApp: app }) - const serverActivity = Logger.activity(`Creating server`) - const server = GracefulShutdownServer.create( - app.listen(port, (err) => { - if (err) { - return - } - Logger.success(serverActivity, `Server is ready on port: ${port}`) - track("CLI_START_COMPLETED") - }) - ) + const { dbConnection, configModule, container } = await loaders({ + directory, + expressApp: app, + }) - // Handle graceful shutdown - const gracefulShutDown = () => { - server - .shutdown() - .then(() => { - Logger.info("Gracefully stopping the server.") - process.exit(0) - }) - .catch((e) => { - Logger.error("Error received when shutting down the server.", e) - process.exit(1) + const shouldStartServer = + configModule.projectConfig.worker_mode !== "worker" + + let server + if (shouldStartServer) { + const serverActivity = Logger.activity(`Creating server`) + server = GracefulShutdownServer.create( + app.listen(port, (err) => { + if (err) { + return + } + Logger.success(serverActivity, `Server is ready on port: ${port}`) + track("CLI_START_COMPLETED") }) + ) + + // Handle graceful shutdown + const gracefulShutDown = () => { + server + .shutdown() + .then(() => { + Logger.info("Gracefully stopping the server.") + process.exit(0) + }) + .catch((e) => { + Logger.error("Error received when shutting down the server.", e) + process.exit(1) + }) + } + + process.on("SIGTERM", gracefulShutDown) + process.on("SIGINT", gracefulShutDown) + } else { + Logger.info("Running in worker mode, server will not be started.") } - process.on("SIGTERM", gracefulShutDown) - process.on("SIGINT", gracefulShutDown) scheduleJob(CRON_SCHEDULE, () => { track("PING") }) - return { dbConnection, server } + return shouldStartServer ? { dbConnection, server } : { dbConnection } } catch (err) { Logger.error("Error starting server", err) process.exit(1) diff --git a/packages/medusa/src/loaders/config.ts b/packages/medusa/src/loaders/config.ts index 23fa4af45c..eebcd488d1 100644 --- a/packages/medusa/src/loaders/config.ts +++ b/packages/medusa/src/loaders/config.ts @@ -1,4 +1,4 @@ -import { getConfigFile } from "medusa-core-utils" +import { getConfigFile, isDefined } from "medusa-core-utils" import { ConfigModule } from "../types/global" import logger from "./logger" @@ -58,11 +58,24 @@ export default (rootDirectory: string): ConfigModule => { ) } + let worker_mode = configModule?.projectConfig?.worker_mode + if (!isDefined(worker_mode)) { + const env = process.env.MEDUSA_WORKER_MODE + if (isDefined(env)) { + if (env === "shared" || env === "worker" || env === "server") { + worker_mode = env + } + } else { + worker_mode = "shared" + } + } + return { projectConfig: { jwt_secret: jwt_secret ?? "supersecret", cookie_secret: cookie_secret ?? "supersecret", ...configModule?.projectConfig, + worker_mode, }, modules: configModule.modules ?? {}, featureFlags: configModule?.featureFlags ?? {}, diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index fa2b9d798f..bbc0003650 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -1,9 +1,9 @@ import { createDefaultsWorkflow } from "@medusajs/core-flows" import { InternalModuleDeclaration, - ModulesDefinition + ModulesDefinition, } from "@medusajs/modules-sdk" -import { MODULE_RESOURCE_TYPE } from "@medusajs/types" +import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types" import { ContainerRegistrationKeys, MedusaV2Flag, @@ -91,14 +91,7 @@ async function loadMedusaV2({ }) { const container = createMedusaContainer() - // Add additional information to context of request - expressApp.use((req: Request, res: Response, next: NextFunction) => { - const ipAddress = requestIp.getClientIp(req) as string - ;(req as any).request_context = { - ip_address: ipAddress, - } - next() - }) + const shouldStartAPI = configModule.projectConfig.worker_mode !== "worker" const pgConnection = await pgConnectionLoader({ container, configModule }) @@ -114,22 +107,33 @@ async function loadMedusaV2({ container, }) - await expressLoader({ app: expressApp, configModule }) + if (shouldStartAPI) { + await expressLoader({ app: expressApp, configModule }) - expressApp.use((req: Request, res: Response, next: NextFunction) => { - req.scope = container.createScope() as MedusaContainer - req.requestId = (req.headers["x-request-id"] as string) ?? v4() - next() - }) + expressApp.use((req: Request, res: Response, next: NextFunction) => { + req.scope = container.createScope() as MedusaContainer + req.requestId = (req.headers["x-request-id"] as string) ?? v4() + next() + }) - // TODO: Add Subscribers loader + // Add additional information to context of request + expressApp.use((req: Request, res: Response, next: NextFunction) => { + const ipAddress = requestIp.getClientIp(req) as string + ;(req as any).request_context = { + ip_address: ipAddress, + } + next() + }) - await apiLoader({ - container, - app: expressApp, - configModule, - featureFlagRouter, - }) + // TODO: Add Subscribers loader + + await apiLoader({ + container, + app: expressApp, + configModule, + featureFlagRouter, + }) + } await medusaProjectApisLoader({ rootDirectory, @@ -142,6 +146,7 @@ async function loadMedusaV2({ await createDefaultsWorkflow(container).run() return { + configModule, container, app: expressApp, pgConnection, @@ -153,6 +158,7 @@ export default async ({ expressApp, isTest, }: Options): Promise<{ + configModule: ConfigModule container: MedusaContainer dbConnection?: Connection app: Express @@ -319,6 +325,7 @@ export default async ({ track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration }) return { + configModule, container, dbConnection, app: expressApp, diff --git a/packages/medusa/src/loaders/load-medusa-project-apis.ts b/packages/medusa/src/loaders/load-medusa-project-apis.ts index 8f7b7dfa87..8dbe27acef 100644 --- a/packages/medusa/src/loaders/load-medusa-project-apis.ts +++ b/packages/medusa/src/loaders/load-medusa-project-apis.ts @@ -41,9 +41,19 @@ export default async ({ }: Options): Promise => { const resolved = getResolvedPlugins(rootDirectory, configModule) || [] + const shouldStartAPI = configModule.projectConfig.worker_mode !== "worker" + await promiseAll( resolved.map(async (pluginDetails) => { - await registerApi(pluginDetails, app, container, configModule, activityId) + if (shouldStartAPI) { + await registerApi( + pluginDetails, + app, + container, + configModule, + activityId + ) + } await registerSubscribers(pluginDetails, container, activityId) await registerWorkflows(pluginDetails) }) diff --git a/packages/medusa/src/loaders/medusa-app.ts b/packages/medusa/src/loaders/medusa-app.ts index be5c826653..491fbe7da7 100644 --- a/packages/medusa/src/loaders/medusa-app.ts +++ b/packages/medusa/src/loaders/medusa-app.ts @@ -168,6 +168,7 @@ export const loadMedusaApp = async ( } const medusaApp = await MedusaApp({ + workerMode: configModule.projectConfig.worker_mode, modulesConfig: configModules, servicesConfig: joinerConfig, remoteFetchData: remoteQueryFetchData(container), diff --git a/packages/modules-sdk/src/medusa-app.ts b/packages/modules-sdk/src/medusa-app.ts index 9e4030fb5a..4f81cd2ad3 100644 --- a/packages/modules-sdk/src/medusa-app.ts +++ b/packages/modules-sdk/src/medusa-app.ts @@ -71,7 +71,8 @@ export async function loadModules( modulesConfig, sharedContainer, migrationOnly = false, - loaderOnly = false + loaderOnly = false, + workerMode: "shared" | "worker" | "server" = "server" ) { const allModules = {} @@ -113,6 +114,7 @@ export async function loadModules( moduleExports, migrationOnly, loaderOnly, + workerMode, })) as LoadedModule if (loaderOnly) { @@ -202,6 +204,7 @@ export type MedusaAppOutput = { } export type MedusaAppOptions = { + workerMode?: "shared" | "worker" | "server" sharedContainer?: MedusaContainer sharedResourcesConfig?: SharedResources loadedModules?: LoadedModule[] @@ -232,6 +235,7 @@ async function MedusaApp_({ onApplicationStartCb, migrationOnly = false, loaderOnly = false, + workerMode = "server", }: MedusaAppOptions & { migrationOnly?: boolean } = {}): Promise<{ modules: Record link: RemoteLink | undefined @@ -300,7 +304,8 @@ async function MedusaApp_({ modules, sharedContainer_, migrationOnly, - loaderOnly + loaderOnly, + workerMode ) if (loaderOnly) { diff --git a/packages/modules-sdk/src/medusa-module.ts b/packages/modules-sdk/src/medusa-module.ts index ec8d875437..dabe000523 100644 --- a/packages/modules-sdk/src/medusa-module.ts +++ b/packages/modules-sdk/src/medusa-module.ts @@ -69,6 +69,7 @@ export type ModuleBootstrapOptions = { * Forces the modules bootstrapper to only run the modules loaders and return prematurely */ loaderOnly?: boolean + workerMode?: "shared" | "worker" | "server" } export type LinkModuleBootstrapOptions = { @@ -225,6 +226,7 @@ export class MedusaModule { injectedDependencies, migrationOnly, loaderOnly, + workerMode, }: ModuleBootstrapOptions): Promise<{ [key: string]: T }> { @@ -267,6 +269,7 @@ export class MedusaModule { options: declaration?.options ?? declaration, alias: declaration?.alias, main: declaration?.main, + worker_mode: workerMode, } } @@ -302,7 +305,7 @@ export class MedusaModule { moduleResolutions, logger: logger_, migrationOnly, - loaderOnly + loaderOnly, }) } catch (err) { errorLoading(err) diff --git a/packages/types/src/common/config-module.ts b/packages/types/src/common/config-module.ts index 29a4903f10..76ba212426 100644 --- a/packages/types/src/common/config-module.ts +++ b/packages/types/src/common/config-module.ts @@ -485,6 +485,25 @@ export type ProjectConfigOptions = { * ``` */ jobs_batch_size?: number + + /** + * Configure the application's worker mode. The default value is `shared`. + * - Use `shared` if you want to run the application in a single process. + * - Use `worker` if you want to run the a worker process only. + * - Use `server` if you want to run the application server only. + * + * @example + * ```js title="medusa-config.js" + * module.exports = { + * projectConfig: { + * worker_mode: "shared" + * // ... + * }, + * // ... + * } + * ``` + */ + worker_mode?: "shared" | "worker" | "server" } /** diff --git a/packages/types/src/modules-sdk/index.ts b/packages/types/src/modules-sdk/index.ts index 56c7a02469..4ce4b5b08f 100644 --- a/packages/types/src/modules-sdk/index.ts +++ b/packages/types/src/modules-sdk/index.ts @@ -48,6 +48,7 @@ export type InternalModuleDeclaration = { * If the module is the main module for the key when multiple ones are registered */ main?: boolean + worker_mode?: "shared" | "worker" | "server" } export type ExternalModuleDeclaration = {