From ab22faaa52008ad952d50f0a3b6b1148515b10c9 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Wed, 14 May 2025 15:17:41 +0200 Subject: [PATCH] Chore/test runner improvements (#12439) **What** Make sure there is no open handles left and that the shutdown function are properly called. Refactor and improve the medusa test runner. Make sure all modules instances are released and cleaned up **NOTE:** On a separate PR we can continue the investigation for the memory growing over time while the tests execute --- .changeset/sour-comics-relate.md | 7 + .../src/__tests__/events.spec.ts | 10 +- packages/medusa-test-utils/src/database.ts | 194 +++++--- packages/medusa-test-utils/src/events.ts | 1 + .../medusa-test-utils/src/init-modules.ts | 3 +- .../medusa-test-runner-utils/bootstrap-app.ts | 134 +++-- .../src/medusa-test-runner-utils/use-db.ts | 3 +- .../src/medusa-test-runner-utils/utils.ts | 20 + .../src/medusa-test-runner.ts | 460 ++++++++++++------ packages/medusa/src/commands/start.ts | 2 +- .../index/src/services/data-synchronizer.ts | 2 +- .../locking/src/providers/in-memory.ts | 2 +- .../src/services/workflows-module.ts | 16 +- 13 files changed, 562 insertions(+), 292 deletions(-) create mode 100644 .changeset/sour-comics-relate.md diff --git a/.changeset/sour-comics-relate.md b/.changeset/sour-comics-relate.md new file mode 100644 index 0000000000..9b84c86abf --- /dev/null +++ b/.changeset/sour-comics-relate.md @@ -0,0 +1,7 @@ +--- +"@medusajs/test-utils": patch +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +--- + +Chore/test runner improvements diff --git a/packages/medusa-test-utils/src/__tests__/events.spec.ts b/packages/medusa-test-utils/src/__tests__/events.spec.ts index 369e984d86..f2d65b2a82 100644 --- a/packages/medusa-test-utils/src/__tests__/events.spec.ts +++ b/packages/medusa-test-utils/src/__tests__/events.spec.ts @@ -31,7 +31,7 @@ describe("waitSubscribersExecution", () => { describe("with no existing listeners", () => { it("should resolve when event is fired before timeout", async () => { const waitPromise = waitSubscribersExecution(TEST_EVENT, eventBus as any) - setTimeout(() => eventBus.emit(TEST_EVENT, "test-data"), 100) + setTimeout(() => eventBus.emit(TEST_EVENT, "test-data"), 100).unref() jest.advanceTimersByTime(100) @@ -69,7 +69,7 @@ describe("waitSubscribersExecution", () => { describe("with existing listeners", () => { it("should resolve when all listeners complete successfully", async () => { const listener = jest.fn().mockImplementation(() => { - return new Promise((resolve) => setTimeout(resolve, 200)) + return new Promise((resolve) => setTimeout(resolve, 200).unref()) }) eventBus.eventEmitter_.on(TEST_EVENT, listener) @@ -131,15 +131,15 @@ describe("waitSubscribersExecution", () => { describe("with multiple listeners", () => { it("should resolve when all listeners complete", async () => { const listener1 = jest.fn().mockImplementation(() => { - return new Promise((resolve) => setTimeout(resolve, 100)) + return new Promise((resolve) => setTimeout(resolve, 100).unref()) }) const listener2 = jest.fn().mockImplementation(() => { - return new Promise((resolve) => setTimeout(resolve, 200)) + return new Promise((resolve) => setTimeout(resolve, 200).unref()) }) const listener3 = jest.fn().mockImplementation(() => { - return new Promise((resolve) => setTimeout(resolve, 300)) + return new Promise((resolve) => setTimeout(resolve, 300).unref()) }) eventBus.eventEmitter_.on(TEST_EVENT, listener1) diff --git a/packages/medusa-test-utils/src/database.ts b/packages/medusa-test-utils/src/database.ts index c533272d51..8a0ee9d2bc 100644 --- a/packages/medusa-test-utils/src/database.ts +++ b/packages/medusa-test-utils/src/database.ts @@ -5,6 +5,8 @@ import { SqlEntityManager, } from "@mikro-orm/postgresql" import { createDatabase, dropDatabase } from "pg-god" +import { logger } from "@medusajs/framework/logger" +import { execOrTimeout } from "./medusa-test-runner-utils" const DB_HOST = process.env.DB_HOST ?? "localhost" const DB_USERNAME = process.env.DB_USERNAME ?? "" @@ -123,31 +125,43 @@ export function getMikroOrmWrapper({ schema: this.schema, }) - // Initializing the ORM - this.orm = await MikroORM.init(OrmConfig) - - this.manager = this.orm.em - try { - await this.orm.getSchemaGenerator().ensureDatabase() - } catch (err) { - console.log(err) - } + this.orm = await MikroORM.init(OrmConfig) + this.manager = this.orm.em - await this.manager?.execute( - `CREATE SCHEMA IF NOT EXISTS "${this.schema ?? "public"}";` - ) + try { + await this.orm.getSchemaGenerator().ensureDatabase() + } catch (err) { + logger.error("Error ensuring database:", err) + throw err + } - const pendingMigrations = await this.orm - .getMigrator() - .getPendingMigrations() + await this.manager?.execute( + `CREATE SCHEMA IF NOT EXISTS "${this.schema ?? "public"}";` + ) - if (pendingMigrations && pendingMigrations.length > 0) { - await this.orm + const pendingMigrations = await this.orm .getMigrator() - .up({ migrations: pendingMigrations.map((m) => m.name!) }) - } else { - await this.orm.schema.refreshDatabase() // ensure db exists and is fresh + .getPendingMigrations() + + if (pendingMigrations && pendingMigrations.length > 0) { + await this.orm + .getMigrator() + .up({ migrations: pendingMigrations.map((m) => m.name!) }) + } else { + await this.orm.schema.refreshDatabase() + } + } catch (error) { + if (this.orm) { + try { + await this.orm.close() + } catch (closeError) { + logger.error("Error closing ORM:", closeError) + } + } + this.orm = null + this.manager = null + throw error } }, @@ -156,20 +170,30 @@ export function getMikroOrmWrapper({ throw new Error("ORM not configured") } - await this.manager?.execute( - `DROP SCHEMA IF EXISTS "${this.schema ?? "public"}" CASCADE;` - ) - - await this.manager?.execute( - `CREATE SCHEMA IF NOT EXISTS "${this.schema ?? "public"}";` - ) - try { - await this.orm.close() - } catch {} + await this.manager?.execute( + `DROP SCHEMA IF EXISTS "${this.schema ?? "public"}" CASCADE;` + ) - this.orm = null - this.manager = null + await this.manager?.execute( + `CREATE SCHEMA IF NOT EXISTS "${this.schema ?? "public"}";` + ) + + const closePromise = this.orm.close() + + await execOrTimeout(closePromise) + } catch (error) { + logger.error("Error clearing database:", error) + try { + await this.orm?.close() + } catch (closeError) { + logger.error("Error during forced ORM close:", closeError) + } + throw error + } finally { + this.orm = null + this.manager = null + } }, } } @@ -178,10 +202,15 @@ export const dbTestUtilFactory = (): any => ({ pgConnection_: null, create: async function (dbName: string) { - await createDatabase( - { databaseName: dbName, errorIfExist: false }, - pgGodCredentials - ) + try { + await createDatabase( + { databaseName: dbName, errorIfExist: false }, + pgGodCredentials + ) + } catch (error) { + logger.error("Error creating database:", error) + throw error + } }, teardown: async function ({ schema }: { schema?: string } = {}) { @@ -189,50 +218,77 @@ export const dbTestUtilFactory = (): any => ({ return } - const runRawQuery = this.pgConnection_.raw.bind(this.pgConnection_) + try { + const runRawQuery = this.pgConnection_.raw.bind(this.pgConnection_) + schema ??= "public" - schema ??= "public" + await runRawQuery(`SET session_replication_role = 'replica';`) + const { rows: tableNames } = await runRawQuery(`SELECT table_name + FROM information_schema.tables + WHERE table_schema = '${schema}';`) - await runRawQuery(`SET session_replication_role = 'replica';`) - const { rows: tableNames } = await runRawQuery(`SELECT table_name - FROM information_schema.tables - WHERE table_schema = '${schema}';`) + const skipIndexPartitionPrefix = "cat_" + const mainPartitionTables = ["index_data", "index_relation"] + let hasIndexTables = false - const skipIndexPartitionPrefix = "cat_" - const mainPartitionTables = ["index_data", "index_relation"] - let hasIndexTables = false - for (const { table_name } of tableNames) { - if (mainPartitionTables.includes(table_name)) { - hasIndexTables = true + for (const { table_name } of tableNames) { + if (mainPartitionTables.includes(table_name)) { + hasIndexTables = true + } + + if ( + table_name.startsWith(skipIndexPartitionPrefix) || + mainPartitionTables.includes(table_name) + ) { + continue + } + + await runRawQuery(`DELETE FROM ${schema}."${table_name}";`) } - // Skipping index partition tables. - if ( - table_name.startsWith(skipIndexPartitionPrefix) || - mainPartitionTables.includes(table_name) - ) { - continue + if (hasIndexTables) { + await runRawQuery(`TRUNCATE TABLE ${schema}.index_data;`) + await runRawQuery(`TRUNCATE TABLE ${schema}.index_relation;`) } - await runRawQuery(`DELETE - FROM ${schema}."${table_name}";`) + await runRawQuery(`SET session_replication_role = 'origin';`) + } catch (error) { + logger.error("Error during database teardown:", error) + throw error } - - if (hasIndexTables) { - await runRawQuery(`TRUNCATE TABLE ${schema}.index_data;`) - await runRawQuery(`TRUNCATE TABLE ${schema}.index_relation;`) - } - - await runRawQuery(`SET session_replication_role = 'origin';`) }, shutdown: async function (dbName: string) { - await this.pgConnection_?.context?.destroy() - await this.pgConnection_?.destroy() + try { + const cleanupPromises: Promise[] = [] - return await dropDatabase( - { databaseName: dbName, errorIfNonExist: false }, - pgGodCredentials - ) + if (this.pgConnection_?.context) { + cleanupPromises.push( + execOrTimeout(this.pgConnection_.context.destroy()) + ) + } + + if (this.pgConnection_) { + cleanupPromises.push(execOrTimeout(this.pgConnection_.destroy())) + } + + await Promise.all(cleanupPromises) + + return await dropDatabase( + { databaseName: dbName, errorIfNonExist: false }, + pgGodCredentials + ) + } catch (error) { + logger.error("Error during database shutdown:", error) + try { + await this.pgConnection_?.context?.destroy() + await this.pgConnection_?.destroy() + } catch (cleanupError) { + logger.error("Error during forced cleanup:", cleanupError) + } + throw error + } finally { + this.pgConnection_ = null + } }, }) diff --git a/packages/medusa-test-utils/src/events.ts b/packages/medusa-test-utils/src/events.ts index 432d58e4fe..e19cc48955 100644 --- a/packages/medusa-test-utils/src/events.ts +++ b/packages/medusa-test-utils/src/events.ts @@ -25,6 +25,7 @@ export const waitSubscribersExecution = ( ) ) }, timeout) + timeoutId.unref() }) // If there are no existing listeners, resolve once the event happens. Otherwise, wrap the existing subscribers in a promise and resolve once they are done. diff --git a/packages/medusa-test-utils/src/init-modules.ts b/packages/medusa-test-utils/src/init-modules.ts index 3cc2109500..c15c2e6665 100644 --- a/packages/medusa-test-utils/src/init-modules.ts +++ b/packages/medusa-test-utils/src/init-modules.ts @@ -8,6 +8,7 @@ import { createPgConnection, promiseAll, } from "@medusajs/framework/utils" +import { logger } from "@medusajs/framework/logger" export interface InitModulesOptions { injectedDependencies?: Record @@ -69,7 +70,7 @@ export async function initModules({ ]) } else { if (!preventConnectionDestroyWarning) { - console.info( + logger.info( `You are using a custom shared connection. The connection won't be destroyed automatically.` ) } diff --git a/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.ts b/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.ts index 157355a9f5..e2b81759d7 100644 --- a/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.ts +++ b/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.ts @@ -2,8 +2,9 @@ import express from "express" import getPort from "get-port" import { resolve } from "path" import { MedusaContainer } from "@medusajs/framework/types" -import { applyEnvVarsToProcess } from "./utils" +import { applyEnvVarsToProcess, execOrTimeout } from "./utils" import { promiseAll, GracefulShutdownServer } from "@medusajs/framework/utils" +import { logger } from "@medusajs/framework/logger" async function bootstrapApp({ cwd, @@ -14,18 +15,23 @@ async function bootstrapApp({ const loaders = require("@medusajs/medusa/loaders/index").default - const { container, shutdown } = await loaders({ - directory: resolve(cwd || process.cwd()), - expressApp: app, - }) + try { + const { container, shutdown } = await loaders({ + directory: resolve(cwd || process.cwd()), + expressApp: app, + }) - const PORT = process.env.PORT ? parseInt(process.env.PORT) : await getPort() + const PORT = process.env.PORT ? parseInt(process.env.PORT) : await getPort() - return { - shutdown, - container, - app, - port: PORT, + return { + shutdown, + container, + app, + port: PORT, + } + } catch (error) { + logger.error("Error bootstrapping app:", error) + throw error } } @@ -37,44 +43,84 @@ export async function startApp({ container: MedusaContainer port: number }> { - const { - app, - port, - container, - shutdown: medusaShutdown, - } = await bootstrapApp({ - cwd, - env, - }) + let expressServer: any + let medusaShutdown: () => Promise = async () => void 0 + let container: MedusaContainer - let expressServer + try { + const { + app, + port, + container: appContainer, + shutdown: appShutdown, + } = await bootstrapApp({ + cwd, + env, + }) - const shutdown = async () => { - await promiseAll([expressServer?.shutdown(), medusaShutdown()]) + container = appContainer + medusaShutdown = appShutdown - if (typeof global !== "undefined" && global?.gc) { - global.gc() + const shutdown = async () => { + try { + const shutdownPromise = promiseAll([ + expressServer?.shutdown(), + medusaShutdown(), + ]) + + await execOrTimeout(shutdownPromise) + + if (typeof global !== "undefined" && global?.gc) { + global.gc() + } + } catch (error) { + logger.error("Error during shutdown:", error) + try { + await expressServer?.shutdown() + await medusaShutdown() + } catch (cleanupError) { + logger.error("Error during forced cleanup:", cleanupError) + } + throw error + } } - } - return await new Promise((resolve, reject) => { - const server = app - .listen(port) - .on("error", async (err) => { - await shutdown() - return reject(err) - }) - .on("listening", () => { - process.send?.(port) - - resolve({ - shutdown, - container, - port, + return await new Promise((resolve, reject) => { + const server = app + .listen(port) + .on("error", async (err) => { + logger.error("Error starting server:", err) + await shutdown() + return reject(err) }) - }) + .on("listening", () => { + process.send?.(port) - // TODO: fix that once we find the appropriate place to put this util - expressServer = GracefulShutdownServer.create(server) - }) + resolve({ + shutdown, + container, + port, + }) + }) + + expressServer = GracefulShutdownServer.create(server) + }) + } catch (error) { + logger.error("Error in startApp:", error) + if (expressServer) { + try { + await expressServer.shutdown() + } catch (cleanupError) { + logger.error("Error cleaning up express server:", cleanupError) + } + } + if (medusaShutdown) { + try { + await medusaShutdown() + } catch (cleanupError) { + logger.error("Error cleaning up medusa:", cleanupError) + } + } + throw error + } } diff --git a/packages/medusa-test-utils/src/medusa-test-runner-utils/use-db.ts b/packages/medusa-test-utils/src/medusa-test-runner-utils/use-db.ts index c8606be611..8204d840d4 100644 --- a/packages/medusa-test-utils/src/medusa-test-runner-utils/use-db.ts +++ b/packages/medusa-test-utils/src/medusa-test-runner-utils/use-db.ts @@ -1,5 +1,6 @@ import type { MedusaAppLoader } from "@medusajs/framework" import { Logger, MedusaContainer } from "@medusajs/framework/types" +import { logger } from "@medusajs/framework/logger" import { ContainerRegistrationKeys, getResolvedPlugins, @@ -27,7 +28,7 @@ export async function migrateDatabase(appLoader: MedusaAppLoader) { try { await appLoader.runModulesMigrations() } catch (err) { - console.error("Something went wrong while running the migrations") + logger.error("Something went wrong while running the migrations") throw err } } diff --git a/packages/medusa-test-utils/src/medusa-test-runner-utils/utils.ts b/packages/medusa-test-utils/src/medusa-test-runner-utils/utils.ts index ca9c8b2864..8cbc3645e5 100644 --- a/packages/medusa-test-utils/src/medusa-test-runner-utils/utils.ts +++ b/packages/medusa-test-utils/src/medusa-test-runner-utils/utils.ts @@ -5,3 +5,23 @@ export function applyEnvVarsToProcess(env?: Record) { Object.entries(env).forEach(([k, v]) => (process.env[k] = v)) } } + +/** + * Execute a function and return a promise that resolves when the function + * resolves or rejects when the function rejects or the timeout is reached. + * @param fn - The function to execute. + * @param timeout - The timeout in milliseconds. + * @returns A promise that resolves when the function resolves or rejects when the function rejects or the timeout is reached. + */ +export async function execOrTimeout( + fn: Promise | (() => Promise), + timeout: number = 5000 +) { + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error("Timeout")), timeout).unref() + }) + + const fnPromise = typeof fn === "function" ? fn() : fn + + return Promise.race([fnPromise, timeoutPromise]) +} diff --git a/packages/medusa-test-utils/src/medusa-test-runner.ts b/packages/medusa-test-utils/src/medusa-test-runner.ts index 28ba562a0e..2a42db7be4 100644 --- a/packages/medusa-test-utils/src/medusa-test-runner.ts +++ b/packages/medusa-test-utils/src/medusa-test-runner.ts @@ -1,10 +1,11 @@ import { MedusaAppOutput } from "@medusajs/framework/modules-sdk" -import { ContainerLike, MedusaContainer } from "@medusajs/framework/types" +import { MedusaContainer } from "@medusajs/framework/types" import { ContainerRegistrationKeys, createMedusaContainer, } from "@medusajs/framework/utils" import { asValue } from "awilix" +import { logger } from "@medusajs/framework/logger" import { dbTestUtilFactory, getDatabaseURL } from "./database" import { applyEnvVarsToProcess, @@ -33,6 +34,254 @@ export interface MedusaSuiteOptions { getMedusaApp: () => MedusaAppOutput } +interface TestRunnerConfig { + moduleName?: string + env?: Record + dbName?: string + medusaConfigFile?: string + schema?: string + debug?: boolean + inApp?: boolean +} + +class MedusaTestRunner { + private dbName: string + private schema: string + private cwd: string + private env: Record + private debug: boolean + // @ts-ignore + private inApp: boolean + + private dbUtils: ReturnType + private dbConfig: { + dbName: string + clientUrl: string + schema: string + debug: boolean + } + + private globalContainer: MedusaContainer | null = null + private apiUtils: any = null + private loadedApplication: any = null + private shutdown: () => Promise = async () => void 0 + private isFirstTime = true + + constructor(config: TestRunnerConfig) { + const tempName = parseInt(process.env.JEST_WORKER_ID || "1") + const moduleName = + config.moduleName ?? Math.random().toString(36).substring(7) + this.dbName = + config.dbName ?? + `medusa-${moduleName.toLowerCase()}-integration-${tempName}` + this.schema = config.schema ?? "public" + this.cwd = config.medusaConfigFile ?? process.cwd() + this.env = config.env ?? {} + this.debug = config.debug ?? false + this.inApp = config.inApp ?? false + + this.dbUtils = dbTestUtilFactory() + this.dbConfig = { + dbName: this.dbName, + clientUrl: getDatabaseURL(this.dbName), + schema: this.schema, + debug: this.debug, + } + + this.setupProcessHandlers() + } + + private setupProcessHandlers(): void { + process.on("SIGTERM", async () => { + await this.cleanup() + process.exit(0) + }) + + process.on("SIGINT", async () => { + await this.cleanup() + process.exit(0) + }) + } + + private createApiProxy(): any { + return new Proxy( + {}, + { + get: (target, prop) => { + return this.apiUtils?.[prop] + }, + } + ) + } + + private createDbConnectionProxy(): any { + return new Proxy( + {}, + { + get: (target, prop) => { + return this.dbUtils.pgConnection_?.[prop] + }, + } + ) + } + + private async initializeDatabase(): Promise { + try { + logger.info(`Creating database ${this.dbName}`) + await this.dbUtils.create(this.dbName) + this.dbUtils.pgConnection_ = await initDb() + } catch (error) { + logger.error(`Error initializing database: ${error?.message}`) + await this.cleanup() + throw error + } + } + + private async setupApplication(): Promise { + const { container, MedusaAppLoader } = await import("@medusajs/framework") + const appLoader = new MedusaAppLoader() + + container.register({ + [ContainerRegistrationKeys.LOGGER]: asValue(logger), + }) + + await this.initializeDatabase() + + logger.info( + `Migrating database with core migrations and links ${this.dbName}` + ) + await migrateDatabase(appLoader) + await syncLinks(appLoader, this.cwd, container, logger) + await clearInstances() + + this.loadedApplication = await appLoader.load() + + try { + const { + shutdown, + container: appContainer, + port, + } = await startApp({ + cwd: this.cwd, + env: this.env, + }) + + this.globalContainer = appContainer + this.shutdown = async () => { + await shutdown() + if (this.apiUtils?.cancelToken?.source) { + this.apiUtils.cancelToken.source.cancel( + "Request canceled by shutdown" + ) + } + } + + const { default: axios } = (await import("axios")) as any + const cancelTokenSource = axios.CancelToken.source() + + this.apiUtils = axios.create({ + baseURL: `http://localhost:${port}`, + cancelToken: cancelTokenSource.token, + }) + + this.apiUtils.cancelToken = { source: cancelTokenSource } + } catch (error) { + logger.error(`Error starting the app: ${error?.message}`) + await this.cleanup() + throw error + } + } + + public async cleanup(): Promise { + try { + process.removeAllListeners("SIGTERM") + process.removeAllListeners("SIGINT") + + await this.dbUtils.shutdown(this.dbName) + await this.shutdown() + await clearInstances() + + if (this.apiUtils?.cancelToken?.source) { + this.apiUtils.cancelToken.source.cancel("Cleanup") + } + + if (this.globalContainer?.dispose) { + await this.globalContainer.dispose() + } + + this.apiUtils = null + this.loadedApplication = null + this.globalContainer = null + + if (global.gc) { + global.gc() + } + } catch (error) { + logger.error("Error during cleanup:", error?.message) + } + } + + public async beforeAll(): Promise { + try { + this.setupProcessHandlers() + await configLoaderOverride(this.cwd, this.dbConfig) + applyEnvVarsToProcess(this.env) + await this.setupApplication() + } catch (error) { + await this.cleanup() + throw error + } + } + + public async beforeEach(): Promise { + if (this.isFirstTime) { + this.isFirstTime = false + return + } + + await this.afterEach() + + const container = this.globalContainer as MedusaContainer + const copiedContainer = createMedusaContainer({}, container) + + try { + const { MedusaAppLoader } = await import("@medusajs/framework") + const medusaAppLoader = new MedusaAppLoader({ + container: copiedContainer, + }) + await medusaAppLoader.runModulesLoader() + } catch (error) { + await copiedContainer.dispose?.() + logger.error("Error running modules loaders:", error?.message) + throw error + } + } + + public async afterEach(): Promise { + try { + await this.dbUtils.teardown({ schema: this.schema }) + } catch (error) { + logger.error("Error tearing down database:", error?.message) + throw error + } + } + + public getOptions(): MedusaSuiteOptions { + return { + api: this.createApiProxy(), + dbConnection: this.createDbConnectionProxy(), + getMedusaApp: () => this.loadedApplication, + getContainer: () => this.globalContainer as MedusaContainer, + dbConfig: { + dbName: this.dbName, + schema: this.schema, + clientUrl: this.dbConfig.clientUrl, + }, + dbUtils: this.dbUtils, + } + } +} + export function medusaIntegrationTestRunner({ moduleName, dbName, @@ -52,172 +301,61 @@ export function medusaIntegrationTestRunner({ inApp?: boolean testSuite: (options: MedusaSuiteOptions) => void }) { - const tempName = parseInt(process.env.JEST_WORKER_ID || "1") - moduleName = moduleName ?? Math.random().toString(36).substring(7) - dbName ??= `medusa-${moduleName.toLowerCase()}-integration-${tempName}` - - let dbConfig = { + const runner = new MedusaTestRunner({ + moduleName, dbName, - clientUrl: getDatabaseURL(dbName), + medusaConfigFile, schema, + env, debug, - } - - const cwd = medusaConfigFile ?? process.cwd() - - let shutdown = async () => void 0 - const dbUtils = dbTestUtilFactory() - let globalContainer: ContainerLike - let apiUtils: any - let loadedApplication: any - - let options = { - api: new Proxy( - {}, - { - get: (target, prop) => { - return apiUtils[prop] - }, - } - ), - dbConnection: new Proxy( - {}, - { - get: (target, prop) => { - return dbUtils.pgConnection_[prop] - }, - } - ), - getMedusaApp: () => loadedApplication, - getContainer: () => globalContainer, - dbConfig: { - dbName, - schema, - clientUrl: dbConfig.clientUrl, - }, - dbUtils, - } as MedusaSuiteOptions - - let isFirstTime = true - - const beforeAll_ = async () => { - await configLoaderOverride(cwd, dbConfig) - applyEnvVarsToProcess(env) - - const { logger, container, MedusaAppLoader } = await import( - "@medusajs/framework" - ) - - const appLoader = new MedusaAppLoader() - container.register({ - [ContainerRegistrationKeys.LOGGER]: asValue(logger), - }) - - try { - logger.info(`Creating database ${dbName}`) - await dbUtils.create(dbName) - dbUtils.pgConnection_ = await initDb() - } catch (error) { - logger.error(`Error initializing database: ${error?.message}`) - throw error - } - - logger.info(`Migrating database with core migrations and links ${dbName}`) - await migrateDatabase(appLoader) - await syncLinks(appLoader, cwd, container, logger) - await clearInstances() - - let containerRes: MedusaContainer = container - let serverShutdownRes: () => any - let portRes: number - - loadedApplication = await appLoader.load() - - try { - const { - shutdown = () => void 0, - container: appContainer, - port, - } = await startApp({ - cwd, - env, - }) - - containerRes = appContainer - serverShutdownRes = shutdown - portRes = port - } catch (error) { - logger.error(`Error starting the app: error?.message`) - throw error - } - - /** - * Run application migrations and sync links when inside - * an application - */ - if (inApp) { - logger.info(`Migrating database with core migrations and links ${dbName}`) - await migrateDatabase(appLoader) - await syncLinks(appLoader, cwd, containerRes, logger) - } - - const { default: axios } = (await import("axios")) as any - - const cancelTokenSource = axios.CancelToken.source() - - globalContainer = containerRes - shutdown = async () => { - await serverShutdownRes() - cancelTokenSource.cancel("Request canceled by shutdown") - } - - apiUtils = axios.create({ - baseURL: `http://localhost:${portRes}`, - cancelToken: cancelTokenSource.token, - }) - } - - const beforeEach_ = async () => { - // The beforeAll already run everything, so lets not re run the loaders for the first iteration - if (isFirstTime) { - isFirstTime = false - return - } - - const container = options.getContainer() - const copiedContainer = createMedusaContainer({}, container) - - try { - const { MedusaAppLoader } = await import("@medusajs/framework") - - const medusaAppLoader = new MedusaAppLoader({ - container: copiedContainer, - }) - await medusaAppLoader.runModulesLoader() - } catch (error) { - console.error("Error runner modules loaders", error?.message) - throw error - } - } - - const afterEach_ = async () => { - try { - await dbUtils.teardown({ schema }) - } catch (error) { - console.error("Error tearing down database:", error?.message) - throw error - } - } + inApp, + }) return describe("", () => { - beforeAll(beforeAll_) - beforeEach(beforeEach_) - afterEach(afterEach_) - afterAll(async () => { - await dbUtils.shutdown(dbName) - await shutdown() + let testOptions: MedusaSuiteOptions + + beforeAll(async () => { + await runner.beforeAll() + testOptions = runner.getOptions() }) - testSuite(options!) + beforeEach(async () => { + await runner.beforeEach() + }) + + afterEach(async () => { + await runner.afterEach() + }) + + afterAll(async () => { + // Run main cleanup + await runner.cleanup() + + // Clean references to the test options + for (const key in testOptions) { + if (typeof testOptions[key] === "function") { + testOptions[key] = null + } else if ( + typeof testOptions[key] === "object" && + testOptions[key] !== null + ) { + Object.keys(testOptions[key]).forEach((k) => { + testOptions[key][k] = null + }) + testOptions[key] = null + } + } + + // Encourage garbage collection + // @ts-ignore + testOptions = null + + if (global.gc) { + global.gc() + } + }) + + // Run test suite with options + testSuite(runner.getOptions()) }) } diff --git a/packages/medusa/src/commands/start.ts b/packages/medusa/src/commands/start.ts index ba0dc48297..24f590b1ee 100644 --- a/packages/medusa/src/commands/start.ts +++ b/packages/medusa/src/commands/start.ts @@ -268,7 +268,7 @@ async function start(args: { if (!isShuttingDown) { cluster.fork() } else if (!isPresent(cluster.workers)) { - setTimeout(killMainProccess, 100) + setTimeout(killMainProccess, 100).unref() } }) diff --git a/packages/modules/index/src/services/data-synchronizer.ts b/packages/modules/index/src/services/data-synchronizer.ts index 07cb6b3ce4..2eeb5a8b65 100644 --- a/packages/modules/index/src/services/data-synchronizer.ts +++ b/packages/modules/index/src/services/data-synchronizer.ts @@ -328,7 +328,7 @@ export class DataSynchronizer { break } - await setTimeout(0) + await setTimeout(0, undefined, { ref: false }) } let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = { diff --git a/packages/modules/locking/src/providers/in-memory.ts b/packages/modules/locking/src/providers/in-memory.ts index 2374f8c442..5b5f05e99b 100644 --- a/packages/modules/locking/src/providers/in-memory.ts +++ b/packages/modules/locking/src/providers/in-memory.ts @@ -193,7 +193,7 @@ export class InMemoryLockingProvider implements ILockingProvider { setTimeout(() => { cancellationToken.cancelled = true reject(new Error("Timed-out acquiring lock.")) - }, seconds * 1000) + }, seconds * 1000).unref() }) } } diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 858151695f..e19b1bc7ff 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -70,14 +70,6 @@ export class WorkflowsModuleService< } __hooks = { - onApplicationShutdown: async () => { - await this.workflowOrchestratorService_.onApplicationShutdown() - await this.redisDisconnectHandler_() - clearInterval(this.clearTimeout_) - }, - onApplicationPrepareShutdown: async () => { - await this.workflowOrchestratorService_.onApplicationPrepareShutdown() - }, onApplicationStart: async () => { await this.workflowOrchestratorService_.onApplicationStart() @@ -88,6 +80,14 @@ export class WorkflowsModuleService< } catch {} }, 1000 * 60 * 60) }, + onApplicationPrepareShutdown: async () => { + await this.workflowOrchestratorService_.onApplicationPrepareShutdown() + }, + onApplicationShutdown: async () => { + await this.workflowOrchestratorService_.onApplicationShutdown() + await this.redisDisconnectHandler_() + clearInterval(this.clearTimeout_) + }, } static prepareFilters(filters: T & { q?: string }) {