chore: medusa shutdown (#6865)

* chore: medusa shutdown

* continue

* use shutdown

* on application shutdown

* consume shutdown

* more connection close

* more cleanup

* more cleanup

* update lock

* revert package

* graceful shutdown

* Create yellow-apples-attack.md

* graceful shutdown

* graceful shutdown

---------

Co-authored-by: Sebastian Rindom <skrindom@gmail.com>
Co-authored-by: Riqwan Thamir <rmthamir@gmail.com>
This commit is contained in:
Adrien de Peretti
2024-03-29 13:22:29 +01:00
committed by GitHub
parent 0c0b425de7
commit 8fd1488938
20 changed files with 234 additions and 76 deletions

View File

@@ -0,0 +1,11 @@
---
"@medusajs/medusa": patch
"@medusajs/cache-redis": patch
"@medusajs/event-bus-redis": patch
"medusa-test-utils": patch
"@medusajs/modules-sdk": patch
"@medusajs/types": patch
"@medusajs/workflow-engine-redis": patch
---
chore: medusa shutdown

View File

@@ -23,6 +23,13 @@ class RedisCacheService implements ICacheService {
this.TTL = options.ttl ?? DEFAULT_CACHE_TIME
this.namespace = options.namespace || DEFAULT_NAMESPACE
}
__hooks = {
onApplicationShutdown: async () => {
this.redis.disconnect()
},
}
/**
* Set a key/value pair to the cache.
* If the ttl is 0 it will act like the value should not be cached at all.

View File

@@ -20,8 +20,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
protected readonly moduleOptions_: EventBusRedisModuleOptions
// eslint-disable-next-line max-len
protected readonly moduleDeclaration_: InternalModuleDeclaration
protected readonly eventBusRedisConnection_: Redis
protected queue_: Queue
protected bullWorker_: Worker
constructor(
{ logger, eventBusRedisConnection }: InjectedDependencies,
@@ -32,6 +34,8 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
// eslint-disable-next-line prefer-rest-params
super(...arguments)
this.eventBusRedisConnection_ = eventBusRedisConnection
this.moduleOptions_ = moduleOptions
this.logger_ = logger
@@ -44,14 +48,26 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
// Register our worker to handle emit calls
const shouldStartWorker = moduleDeclaration.worker_mode !== "server"
if (shouldStartWorker) {
new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, {
prefix: `${this.constructor.name}`,
...(moduleOptions.workerOptions ?? {}),
connection: eventBusRedisConnection,
})
this.bullWorker_ = new Worker(
moduleOptions.queueName ?? "events-queue",
this.worker_,
{
prefix: `${this.constructor.name}`,
...(moduleOptions.workerOptions ?? {}),
connection: eventBusRedisConnection,
}
)
}
}
__hooks = {
onApplicationShutdown: async () => {
await this.bullWorker_.close(true)
await this.queue_.close()
this.eventBusRedisConnection_.disconnect()
},
}
/**
* Emit a single event
* @param {string} eventName - the name of the event to be process.

View File

@@ -4,7 +4,11 @@ import {
MedusaModuleConfig,
ModuleJoinerConfig,
} from "@medusajs/modules-sdk"
import { ContainerRegistrationKeys, ModulesSdkUtils } from "@medusajs/utils"
import {
ContainerRegistrationKeys,
ModulesSdkUtils,
promiseAll,
} from "@medusajs/utils"
export interface InitModulesOptions {
injectedDependencies?: Record<string, unknown>
@@ -48,8 +52,11 @@ export async function initModules({
async function shutdown() {
if (shouldDestroyConnectionAutomatically) {
await (sharedPgConnection as any).context?.destroy()
await (sharedPgConnection as any).destroy()
await promiseAll([
(sharedPgConnection as any).context?.destroy(),
(sharedPgConnection as any).destroy(),
medusaApp.onApplicationShutdown(),
])
} else {
if (!preventConnectionDestroyWarning) {
console.info(

View File

@@ -1,7 +1,8 @@
const path = require("path")
const express = require("express")
const getPort = require("get-port")
const { isObject } = require("@medusajs/utils")
const { isObject, promiseAll } = require("@medusajs/utils")
const { GracefulShutdownServer } = require("medusa-core-utils")
async function bootstrapApp({ cwd, env = {} } = {}) {
const app = express()
@@ -12,20 +13,17 @@ async function bootstrapApp({ cwd, env = {} } = {}) {
const loaders = require("@medusajs/medusa/dist/loaders").default
const { container, dbConnection, pgConnection, disposeResources } =
await loaders({
directory: path.resolve(cwd || process.cwd()),
expressApp: app,
isTest: false,
})
const { container, shutdown } = await loaders({
directory: path.resolve(cwd || process.cwd()),
expressApp: app,
isTest: false,
})
const PORT = await getPort()
return {
disposeResources,
shutdown,
container,
db: dbConnection,
pgConnection,
app,
port: PORT,
}
@@ -37,10 +35,16 @@ module.exports = {
env = {},
skipExpressListen = false,
} = {}) => {
const { app, port, container, db, pgConnection } = await bootstrapApp({
const {
app,
port,
container,
shutdown: medusaShutdown,
} = await bootstrapApp({
cwd,
env,
})
let expressServer
if (skipExpressListen) {
@@ -48,13 +52,7 @@ module.exports = {
}
const shutdown = async () => {
await Promise.all([
container.dispose(),
expressServer.close(),
db?.destroy(),
pgConnection?.context?.destroy(),
container.dispose(),
])
await promiseAll([expressServer.shutdown(), medusaShutdown()])
if (typeof global !== "undefined" && global?.gc) {
global.gc()
@@ -62,7 +60,7 @@ module.exports = {
}
return await new Promise((resolve, reject) => {
expressServer = app.listen(port, async (err) => {
const server = app.listen(port, async (err) => {
if (err) {
await shutdown()
return reject(err)
@@ -74,6 +72,8 @@ module.exports = {
port,
})
})
expressServer = GracefulShutdownServer.create(server)
})
},
}

View File

@@ -54,6 +54,7 @@ const dbTestUtilFactory = (): any => ({
shutdown: async function (dbName: string) {
await this.db_?.destroy()
await this.pgConnection_?.context?.destroy()
await this.pgConnection_?.destroy()
return await dropDatabase(
{ databaseName: dbName, errorIfNonExist: false },
@@ -116,7 +117,7 @@ export function medusaIntegrationTestRunner({
const cwd = process.cwd()
let shutdown: () => Promise<void>
let shutdown = async () => void 0
let dbUtils = dbTestUtilFactory()
let container: ContainerLike
let apiUtils: any
@@ -142,6 +143,8 @@ export function medusaIntegrationTestRunner({
getContainer: () => container,
} as MedusaSuiteOptions
let isFirstTime = true
const beforeAll_ = async () => {
await dbUtils.create(dbName)
const { dbDataSource, pgConnection } = await initDb({
@@ -156,7 +159,7 @@ export function medusaIntegrationTestRunner({
dbUtils.pgConnection_ = pgConnection
const {
shutdown: shutdown_,
shutdown: serverShutdown,
container: container_,
port,
} = await startBootstrapApp({
@@ -164,13 +167,26 @@ export function medusaIntegrationTestRunner({
env,
})
apiUtils = axios.create({ baseURL: `http://localhost:${port}` })
const cancelTokenSource = axios.CancelToken.source()
apiUtils = axios.create({
baseURL: `http://localhost:${port}`,
cancelToken: cancelTokenSource.token,
})
container = container_
shutdown = shutdown_
shutdown = async () => {
await serverShutdown()
cancelTokenSource.cancel("Request canceled by shutdown")
}
}
const beforeEach_ = async () => {
// The beforeAll already run everything, so lets not re run the loaders for the first iteration
if (isFirstTime) {
isFirstTime = false
return
}
const container = options.getContainer()
const copiedContainer = createMedusaContainer({}, container)

View File

@@ -54,7 +54,10 @@ export default async function ({ port, cpus, directory }) {
const app = express()
const { dbConnection } = await loaders({ directory, expressApp: app })
const { dbConnection, shutdown } = await loaders({
directory,
expressApp: app,
})
const serverActivity = Logger.activity(`Creating server`)
const server = GracefulShutdownServer.create(
app.listen(port, (err) => {
@@ -70,7 +73,9 @@ export default async function ({ port, cpus, directory }) {
server
.shutdown()
.then(() => {
process.exit(0)
shutdown().then(() => {
process.exit(0)
})
})
.catch((e) => {
process.exit(1)

View File

@@ -19,14 +19,13 @@ export default async function ({ port, directory }) {
const app = express()
try {
const { dbConnection, configModule, container } = await loaders({
const { dbConnection, shutdown } = await loaders({
directory,
expressApp: app,
})
let server
const serverActivity = Logger.activity(`Creating server`)
server = GracefulShutdownServer.create(
const server = GracefulShutdownServer.create(
app.listen(port, (err) => {
if (err) {
return
@@ -42,7 +41,9 @@ export default async function ({ port, directory }) {
.shutdown()
.then(() => {
Logger.info("Gracefully stopping the server.")
process.exit(0)
shutdown().then(() => {
process.exit(0)
})
})
.catch((e) => {
Logger.error("Error received when shutting down the server.", e)

View File

@@ -11,7 +11,10 @@ type Options = {
configModule: ConfigModule
}
export default async ({ app, configModule }: Options): Promise<Express> => {
export default async ({ app, configModule }: Options): Promise<{
app: Express,
shutdown: () => Promise<void>
}> => {
let sameSite: string | boolean = false
let secure = false
if (
@@ -38,9 +41,11 @@ export default async ({ app, configModule }: Options): Promise<Express> => {
store: null,
}
let redisClient
if (configModule?.projectConfig?.redis_url) {
const RedisStore = createStore(session)
const redisClient = new Redis(
redisClient = new Redis(
configModule.projectConfig.redis_url,
configModule.projectConfig.redis_options ?? {}
)
@@ -63,5 +68,9 @@ export default async ({ app, configModule }: Options): Promise<Express> => {
res.status(200).send("OK")
})
return app
const shutdown = async () => {
redisClient?.disconnect()
}
return { app, shutdown }
}

View File

@@ -6,8 +6,8 @@ import {
import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types"
import {
ContainerRegistrationKeys,
MedusaV2Flag,
isString,
MedusaV2Flag, promiseAll,
} from "@medusajs/utils"
import { asValue } from "awilix"
import { Express, NextFunction, Request, Response } from "express"
@@ -97,18 +97,21 @@ async function loadMedusaV2({
container.register({
[ContainerRegistrationKeys.LOGGER]: asValue(Logger),
featureFlagRouter: asValue(featureFlagRouter),
[ContainerRegistrationKeys.FEATURE_FLAG_ROUTER]: asValue(featureFlagRouter),
[ContainerRegistrationKeys.CONFIG_MODULE]: asValue(configModule),
["remoteQuery"]: asValue(null),
[ContainerRegistrationKeys.REMOTE_QUERY]: asValue(null),
})
await loadMedusaApp({
const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({
configModule,
container,
})
let expressShutdown = async () => {}
if (shouldStartAPI) {
await expressLoader({ app: expressApp, configModule })
const { shutdown } = await expressLoader({ app: expressApp, configModule })
expressShutdown = shutdown
expressApp.use((req: Request, res: Response, next: NextFunction) => {
req.scope = container.createScope() as MedusaContainer
@@ -145,11 +148,21 @@ async function loadMedusaV2({
await createDefaultsWorkflow(container).run()
const shutdown = async () => {
await promiseAll([
container.dispose(),
pgConnection?.context?.destroy(),
expressShutdown(),
medusaAppOnApplicationShutdown()
])
}
return {
configModule,
container,
app: expressApp,
pgConnection,
shutdown,
}
}
@@ -163,6 +176,7 @@ export default async ({
dbConnection?: Connection
app: Express
pgConnection: unknown
shutdown: () => Promise<void>
}> => {
const configModule = loadConfig(rootDirectory)
const featureFlagRouter = featureFlagsLoader(configModule, Logger)
@@ -197,7 +211,7 @@ export default async ({
featureFlagRouter: asValue(featureFlagRouter),
})
await redisLoader({ container, configModule, logger: Logger })
const { shutdown: redisShutdown } = await redisLoader({ container, configModule, logger: Logger })
const modelsActivity = Logger.activity(`Initializing models${EOL}`)
track("MODELS_INIT_STARTED")
@@ -257,7 +271,7 @@ export default async ({
track("MODULES_INIT_STARTED")
// Move before services init once all modules are migrated and do not rely on core resources anymore
await loadMedusaApp({
const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({
configModule,
container,
})
@@ -267,7 +281,7 @@ export default async ({
const expActivity = Logger.activity(`Initializing express${EOL}`)
track("EXPRESS_INIT_STARTED")
await expressLoader({ app: expressApp, configModule })
const { shutdown: expressShutdown } = await expressLoader({ app: expressApp, configModule })
await passportLoader({ app: expressApp, configModule })
const exAct = Logger.success(expActivity, "Express intialized") || {}
track("EXPRESS_INIT_COMPLETED", { duration: exAct.duration })
@@ -324,11 +338,23 @@ export default async ({
Logger.success(searchActivity, "Indexing event emitted") || {}
track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration })
async function shutdown() {
await promiseAll([
container.dispose(),
dbConnection?.destroy(),
pgConnection?.context?.destroy(),
redisShutdown(),
expressShutdown(),
medusaAppOnApplicationShutdown(),
])
}
return {
configModule,
container,
dbConnection,
app: expressApp,
pgConnection,
shutdown,
}
}

View File

@@ -249,7 +249,6 @@ export async function runModulesLoader({
}
container: MedusaContainer
}): Promise<void> {
const featureFlagRouter = container.resolve<FlagRouter>("featureFlagRouter")
const injectedDependencies = {
[ContainerRegistrationKeys.PG_CONNECTION]: container.resolve(
ContainerRegistrationKeys.PG_CONNECTION

View File

@@ -15,23 +15,25 @@ async function redisLoader({
container,
configModule,
logger,
}: Options): Promise<void> {
}: Options): Promise<{ shutdown: () => Promise<void> }> {
let client!: Redis | FakeRedis
if (configModule.projectConfig.redis_url) {
const redisClient = new Redis(configModule.projectConfig.redis_url, {
client = new Redis(configModule.projectConfig.redis_url, {
// Lazy connect to properly handle connection errors
lazyConnect: true,
...(configModule.projectConfig.redis_options ?? {}),
})
try {
await redisClient.connect()
await client.connect()
logger?.info(`Connection to Redis established`)
} catch (err) {
logger?.error(`An error occurred while connecting to Redis:${EOL} ${err}`)
}
container.register({
redisClient: asValue(redisClient),
redisClient: asValue(client),
})
} else {
if (process.env.NODE_ENV === "production") {
@@ -43,12 +45,18 @@ async function redisLoader({
logger.info("Using fake Redis")
// Economical way of dealing with redis clients
const client = new FakeRedis()
client = new FakeRedis()
container.register({
redisClient: asValue(client),
})
}
return {
shutdown: async () => {
client.disconnect()
},
}
}
export default redisLoader

View File

@@ -20,7 +20,9 @@ export async function loadInternalModule(
migrationOnly?: boolean,
loaderOnly?: boolean
): Promise<{ error?: Error } | void> {
const registrationName = resolution.definition.registrationName
const registrationName = !loaderOnly
? resolution.definition.registrationName
: resolution.definition.registrationName + "__loaderOnly"
const { resources } =
resolution.moduleDeclaration as InternalModuleDeclaration
@@ -122,11 +124,8 @@ export async function loadInternalModule(
}
}
if (loaderOnly) {
return
}
const moduleService = loadedModule.service
container.register({
[registrationName]: asFunction((cradle) => {
;(moduleService as any).__type = MedusaModuleType
@@ -137,6 +136,12 @@ export async function loadInternalModule(
)
}).singleton(),
})
if (loaderOnly) {
// The expectation is only to run the loader as standalone, so we do not need to register the service and we need to cleanup all services
const service = container.resolve(registrationName)
await service.__hooks?.onApplicationShutdown()
}
}
export async function loadModuleMigrations(

View File

@@ -22,6 +22,7 @@ import {
isObject,
isString,
ModulesSdkUtils,
promiseAll,
} from "@medusajs/utils"
import { asValue } from "awilix"
import {
@@ -202,6 +203,7 @@ export type MedusaAppOutput = {
entitiesMap?: Record<string, any>
notFound?: Record<string, Record<string, string>>
runMigrations: RunMigrationFn
onApplicationShutdown: () => Promise<void>
}
export type MedusaAppOptions = {
@@ -237,19 +239,16 @@ async function MedusaApp_({
migrationOnly = false,
loaderOnly = false,
workerMode = "server",
}: MedusaAppOptions & { migrationOnly?: boolean } = {}): Promise<{
modules: Record<string, LoadedModule | LoadedModule[]>
link: RemoteLink | undefined
query: (
query: string | RemoteJoinerQuery | object,
variables?: Record<string, unknown>,
options?: RemoteJoinerOptions
) => Promise<any>
entitiesMap?: Record<string, any>
notFound?: Record<string, Record<string, string>>
runMigrations: RunMigrationFn
}> {
}: MedusaAppOptions & {
migrationOnly?: boolean
} = {}): Promise<MedusaAppOutput> {
const sharedContainer_ = createMedusaContainer({}, sharedContainer)
const onApplicationShutdown = async () => {
await promiseAll([
MedusaModule.onApplicationShutdown(),
sharedContainer_.dispose(),
])
}
const modules: MedusaModuleConfig =
modulesConfig ??
@@ -312,6 +311,7 @@ async function MedusaApp_({
if (loaderOnly) {
return {
onApplicationShutdown,
modules: allModules,
link: undefined,
query: async () => {
@@ -387,6 +387,7 @@ async function MedusaApp_({
}
return {
onApplicationShutdown,
modules: allModules,
link: remoteLink,
query,

View File

@@ -15,6 +15,7 @@ import {
} from "@medusajs/types"
import {
createMedusaContainer,
promiseAll,
simpleHash,
stringifyCircular,
} from "@medusajs/utils"
@@ -116,6 +117,20 @@ export class MedusaModule {
}
}
}
public static async onApplicationShutdown(): Promise<void> {
await promiseAll(
[...MedusaModule.instances_.values()].map(instances => {
return Object.values(instances).map((instance: IModuleService) => {
return instance.__hooks?.onApplicationShutdown
?.bind(instance)()
.catch(() => {
// The module should handle this and log it
return void 0
})
})
}).flat()
)
}
public static clearInstances(): void {
MedusaModule.instances_.clear()

View File

@@ -294,5 +294,6 @@ export interface IModuleService {
*/
__hooks?: {
onApplicationStart?: () => Promise<void>
onApplicationShutdown?: () => Promise<void>
}
}

View File

@@ -63,6 +63,12 @@ export default async ({
redisPublisher: asValue(redisPublisher),
redisSubscriber: asValue(redisSubscriber),
redisQueueName: asValue(queueName),
redisDisconnectHandler: asValue(async () => {
connection.disconnect()
workerConnection.disconnect()
redisPublisher.disconnect()
redisSubscriber.disconnect()
}),
})
}

View File

@@ -5,7 +5,7 @@ import {
TransactionStep,
} from "@medusajs/orchestration"
import { ContainerLike, Context, MedusaContainer } from "@medusajs/types"
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils"
import {
FlowRunOptions,
MedusaWorkflow,
@@ -76,6 +76,8 @@ export class WorkflowOrchestratorService {
protected redisSubscriber: Redis
private subscribers: Subscribers = new Map()
protected redisDistributedTransactionStorage_: RedisDistributedTransactionStorage
constructor({
redisDistributedTransactionStorage,
redisPublisher,
@@ -92,6 +94,9 @@ export class WorkflowOrchestratorService {
redisDistributedTransactionStorage.setWorkflowOrchestratorService(this)
DistributedTransaction.setStorage(redisDistributedTransactionStorage)
this.redisDistributedTransactionStorage_ =
redisDistributedTransactionStorage
this.redisSubscriber.on("message", async (_, message) => {
const { instanceId, data } = JSON.parse(message)
@@ -99,6 +104,10 @@ export class WorkflowOrchestratorService {
})
}
async onApplicationShutdown() {
await this.redisDistributedTransactionStorage_.onApplicationShutdown()
}
@InjectSharedContext()
async run<T = unknown>(
workflowIdOrWorkflow: string | ReturnWorkflow<any, any, any>,

View File

@@ -9,9 +9,9 @@ import {
import {
InjectManager,
InjectSharedContext,
isString,
MedusaContext,
MedusaError,
isString,
} from "@medusajs/utils"
import type {
IWorkflowEngineService,
@@ -26,30 +26,41 @@ type InjectedDependencies = {
baseRepository: DAL.RepositoryService
workflowExecutionService: ModulesSdkTypes.InternalModuleService<any>
workflowOrchestratorService: WorkflowOrchestratorService
redisDisconnectHandler: () => Promise<void>
}
export class WorkflowsModuleService implements IWorkflowEngineService {
protected baseRepository_: DAL.RepositoryService
protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
protected workflowOrchestratorService_: WorkflowOrchestratorService
protected redisDisconnectHandler_: () => Promise<void>
constructor(
{
baseRepository,
workflowExecutionService,
workflowOrchestratorService,
redisDisconnectHandler,
}: InjectedDependencies,
protected readonly moduleDeclaration: InternalModuleDeclaration
) {
this.baseRepository_ = baseRepository
this.workflowExecutionService_ = workflowExecutionService
this.workflowOrchestratorService_ = workflowOrchestratorService
this.redisDisconnectHandler_ = redisDisconnectHandler
}
__joinerConfig(): ModuleJoinerConfig {
return joinerConfig
}
__hooks = {
onApplicationShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationShutdown()
await this.redisDisconnectHandler_()
},
}
@InjectManager("baseRepository_")
async retrieveWorkflowExecution(
idOrObject:
@@ -191,7 +202,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
transactionId: string,
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.getRunningTransaction(
return await this.workflowOrchestratorService_.getRunningTransaction(
workflowId,
transactionId,
context
@@ -211,7 +222,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.setStepSuccess(
return await this.workflowOrchestratorService_.setStepSuccess(
{
idempotencyKey,
stepResponse,
@@ -234,7 +245,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
},
@MedusaContext() context: Context = {}
) {
return this.workflowOrchestratorService_.setStepFailure(
return await this.workflowOrchestratorService_.setStepFailure(
{
idempotencyKey,
stepResponse,

View File

@@ -64,6 +64,11 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
)
}
async onApplicationShutdown() {
await this.queue.close()
await this.worker.close(true)
}
setWorkflowOrchestratorService(workflowOrchestratorService) {
this.workflowOrchestratorService_ = workflowOrchestratorService
}