feat: v2 - add worker mode (#6739)

**What**
- Adds support for starting a Medusa process with a worker mode.
- The worker modes supported are "shared", "worker", "server"
- In "worker" mode, API routes are not registered and modules that need to run workers (e.g., event bus redis) can use the flag to conditionally start workers.
- In "server" mode, API routes are registered and workers are not started.
- In "shared" mode, API routes are registered and workers are started. This is great for development.
This commit is contained in:
Sebastian Rindom
2024-03-21 14:08:20 +01:00
committed by GitHub
parent 205573f5e3
commit 56481e683d
11 changed files with 141 additions and 58 deletions

View File

@@ -0,0 +1,8 @@
---
"@medusajs/medusa": patch
"@medusajs/event-bus-redis": patch
"@medusajs/modules-sdk": patch
"@medusajs/types": patch
---
feat: v2 - add worker mode

View File

@@ -42,11 +42,14 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
})
// Register our worker to handle emit calls
new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, {
prefix: `${this.constructor.name}`,
...(moduleOptions.workerOptions ?? {}),
connection: eventBusRedisConnection,
})
const shouldStartWorker = moduleDeclaration.worker_mode !== "server"
if (shouldStartWorker) {
new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, {
prefix: `${this.constructor.name}`,
...(moduleOptions.workerOptions ?? {}),
connection: eventBusRedisConnection,
})
}
}
/**

View File

@@ -19,39 +19,52 @@ export default async function ({ port, directory }) {
const app = express()
try {
const { dbConnection } = await loaders({ directory, expressApp: app })
const serverActivity = Logger.activity(`Creating server`)
const server = GracefulShutdownServer.create(
app.listen(port, (err) => {
if (err) {
return
}
Logger.success(serverActivity, `Server is ready on port: ${port}`)
track("CLI_START_COMPLETED")
})
)
const { dbConnection, configModule, container } = await loaders({
directory,
expressApp: app,
})
// Handle graceful shutdown
const gracefulShutDown = () => {
server
.shutdown()
.then(() => {
Logger.info("Gracefully stopping the server.")
process.exit(0)
})
.catch((e) => {
Logger.error("Error received when shutting down the server.", e)
process.exit(1)
const shouldStartServer =
configModule.projectConfig.worker_mode !== "worker"
let server
if (shouldStartServer) {
const serverActivity = Logger.activity(`Creating server`)
server = GracefulShutdownServer.create(
app.listen(port, (err) => {
if (err) {
return
}
Logger.success(serverActivity, `Server is ready on port: ${port}`)
track("CLI_START_COMPLETED")
})
)
// Handle graceful shutdown
const gracefulShutDown = () => {
server
.shutdown()
.then(() => {
Logger.info("Gracefully stopping the server.")
process.exit(0)
})
.catch((e) => {
Logger.error("Error received when shutting down the server.", e)
process.exit(1)
})
}
process.on("SIGTERM", gracefulShutDown)
process.on("SIGINT", gracefulShutDown)
} else {
Logger.info("Running in worker mode, server will not be started.")
}
process.on("SIGTERM", gracefulShutDown)
process.on("SIGINT", gracefulShutDown)
scheduleJob(CRON_SCHEDULE, () => {
track("PING")
})
return { dbConnection, server }
return shouldStartServer ? { dbConnection, server } : { dbConnection }
} catch (err) {
Logger.error("Error starting server", err)
process.exit(1)

View File

@@ -1,4 +1,4 @@
import { getConfigFile } from "medusa-core-utils"
import { getConfigFile, isDefined } from "medusa-core-utils"
import { ConfigModule } from "../types/global"
import logger from "./logger"
@@ -58,11 +58,24 @@ export default (rootDirectory: string): ConfigModule => {
)
}
let worker_mode = configModule?.projectConfig?.worker_mode
if (!isDefined(worker_mode)) {
const env = process.env.MEDUSA_WORKER_MODE
if (isDefined(env)) {
if (env === "shared" || env === "worker" || env === "server") {
worker_mode = env
}
} else {
worker_mode = "shared"
}
}
return {
projectConfig: {
jwt_secret: jwt_secret ?? "supersecret",
cookie_secret: cookie_secret ?? "supersecret",
...configModule?.projectConfig,
worker_mode,
},
modules: configModule.modules ?? {},
featureFlags: configModule?.featureFlags ?? {},

View File

@@ -1,9 +1,9 @@
import { createDefaultsWorkflow } from "@medusajs/core-flows"
import {
InternalModuleDeclaration,
ModulesDefinition
ModulesDefinition,
} from "@medusajs/modules-sdk"
import { MODULE_RESOURCE_TYPE } from "@medusajs/types"
import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types"
import {
ContainerRegistrationKeys,
MedusaV2Flag,
@@ -91,14 +91,7 @@ async function loadMedusaV2({
}) {
const container = createMedusaContainer()
// Add additional information to context of request
expressApp.use((req: Request, res: Response, next: NextFunction) => {
const ipAddress = requestIp.getClientIp(req) as string
;(req as any).request_context = {
ip_address: ipAddress,
}
next()
})
const shouldStartAPI = configModule.projectConfig.worker_mode !== "worker"
const pgConnection = await pgConnectionLoader({ container, configModule })
@@ -114,22 +107,33 @@ async function loadMedusaV2({
container,
})
await expressLoader({ app: expressApp, configModule })
if (shouldStartAPI) {
await expressLoader({ app: expressApp, configModule })
expressApp.use((req: Request, res: Response, next: NextFunction) => {
req.scope = container.createScope() as MedusaContainer
req.requestId = (req.headers["x-request-id"] as string) ?? v4()
next()
})
expressApp.use((req: Request, res: Response, next: NextFunction) => {
req.scope = container.createScope() as MedusaContainer
req.requestId = (req.headers["x-request-id"] as string) ?? v4()
next()
})
// TODO: Add Subscribers loader
// Add additional information to context of request
expressApp.use((req: Request, res: Response, next: NextFunction) => {
const ipAddress = requestIp.getClientIp(req) as string
;(req as any).request_context = {
ip_address: ipAddress,
}
next()
})
await apiLoader({
container,
app: expressApp,
configModule,
featureFlagRouter,
})
// TODO: Add Subscribers loader
await apiLoader({
container,
app: expressApp,
configModule,
featureFlagRouter,
})
}
await medusaProjectApisLoader({
rootDirectory,
@@ -142,6 +146,7 @@ async function loadMedusaV2({
await createDefaultsWorkflow(container).run()
return {
configModule,
container,
app: expressApp,
pgConnection,
@@ -153,6 +158,7 @@ export default async ({
expressApp,
isTest,
}: Options): Promise<{
configModule: ConfigModule
container: MedusaContainer
dbConnection?: Connection
app: Express
@@ -319,6 +325,7 @@ export default async ({
track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration })
return {
configModule,
container,
dbConnection,
app: expressApp,

View File

@@ -41,9 +41,19 @@ export default async ({
}: Options): Promise<void> => {
const resolved = getResolvedPlugins(rootDirectory, configModule) || []
const shouldStartAPI = configModule.projectConfig.worker_mode !== "worker"
await promiseAll(
resolved.map(async (pluginDetails) => {
await registerApi(pluginDetails, app, container, configModule, activityId)
if (shouldStartAPI) {
await registerApi(
pluginDetails,
app,
container,
configModule,
activityId
)
}
await registerSubscribers(pluginDetails, container, activityId)
await registerWorkflows(pluginDetails)
})

View File

@@ -168,6 +168,7 @@ export const loadMedusaApp = async (
}
const medusaApp = await MedusaApp({
workerMode: configModule.projectConfig.worker_mode,
modulesConfig: configModules,
servicesConfig: joinerConfig,
remoteFetchData: remoteQueryFetchData(container),

View File

@@ -71,7 +71,8 @@ export async function loadModules(
modulesConfig,
sharedContainer,
migrationOnly = false,
loaderOnly = false
loaderOnly = false,
workerMode: "shared" | "worker" | "server" = "server"
) {
const allModules = {}
@@ -113,6 +114,7 @@ export async function loadModules(
moduleExports,
migrationOnly,
loaderOnly,
workerMode,
})) as LoadedModule
if (loaderOnly) {
@@ -202,6 +204,7 @@ export type MedusaAppOutput = {
}
export type MedusaAppOptions = {
workerMode?: "shared" | "worker" | "server"
sharedContainer?: MedusaContainer
sharedResourcesConfig?: SharedResources
loadedModules?: LoadedModule[]
@@ -232,6 +235,7 @@ async function MedusaApp_({
onApplicationStartCb,
migrationOnly = false,
loaderOnly = false,
workerMode = "server",
}: MedusaAppOptions & { migrationOnly?: boolean } = {}): Promise<{
modules: Record<string, LoadedModule | LoadedModule[]>
link: RemoteLink | undefined
@@ -300,7 +304,8 @@ async function MedusaApp_({
modules,
sharedContainer_,
migrationOnly,
loaderOnly
loaderOnly,
workerMode
)
if (loaderOnly) {

View File

@@ -69,6 +69,7 @@ export type ModuleBootstrapOptions = {
* Forces the modules bootstrapper to only run the modules loaders and return prematurely
*/
loaderOnly?: boolean
workerMode?: "shared" | "worker" | "server"
}
export type LinkModuleBootstrapOptions = {
@@ -225,6 +226,7 @@ export class MedusaModule {
injectedDependencies,
migrationOnly,
loaderOnly,
workerMode,
}: ModuleBootstrapOptions): Promise<{
[key: string]: T
}> {
@@ -267,6 +269,7 @@ export class MedusaModule {
options: declaration?.options ?? declaration,
alias: declaration?.alias,
main: declaration?.main,
worker_mode: workerMode,
}
}
@@ -302,7 +305,7 @@ export class MedusaModule {
moduleResolutions,
logger: logger_,
migrationOnly,
loaderOnly
loaderOnly,
})
} catch (err) {
errorLoading(err)

View File

@@ -485,6 +485,25 @@ export type ProjectConfigOptions = {
* ```
*/
jobs_batch_size?: number
/**
* Configure the application's worker mode. The default value is `shared`.
* - Use `shared` if you want to run the application in a single process.
* - Use `worker` if you want to run the a worker process only.
* - Use `server` if you want to run the application server only.
*
* @example
* ```js title="medusa-config.js"
* module.exports = {
* projectConfig: {
* worker_mode: "shared"
* // ...
* },
* // ...
* }
* ```
*/
worker_mode?: "shared" | "worker" | "server"
}
/**

View File

@@ -48,6 +48,7 @@ export type InternalModuleDeclaration = {
* If the module is the main module for the key when multiple ones are registered
*/
main?: boolean
worker_mode?: "shared" | "worker" | "server"
}
export type ExternalModuleDeclaration = {