diff --git a/.changeset/yellow-apples-attack.md b/.changeset/yellow-apples-attack.md new file mode 100644 index 0000000000..be9455f631 --- /dev/null +++ b/.changeset/yellow-apples-attack.md @@ -0,0 +1,11 @@ +--- +"@medusajs/medusa": patch +"@medusajs/cache-redis": patch +"@medusajs/event-bus-redis": patch +"medusa-test-utils": patch +"@medusajs/modules-sdk": patch +"@medusajs/types": patch +"@medusajs/workflow-engine-redis": patch +--- + +chore: medusa shutdown diff --git a/packages/cache-redis/src/services/redis-cache.ts b/packages/cache-redis/src/services/redis-cache.ts index b562ef8415..7872a6567d 100644 --- a/packages/cache-redis/src/services/redis-cache.ts +++ b/packages/cache-redis/src/services/redis-cache.ts @@ -23,6 +23,13 @@ class RedisCacheService implements ICacheService { this.TTL = options.ttl ?? DEFAULT_CACHE_TIME this.namespace = options.namespace || DEFAULT_NAMESPACE } + + __hooks = { + onApplicationShutdown: async () => { + this.redis.disconnect() + }, + } + /** * Set a key/value pair to the cache. * If the ttl is 0 it will act like the value should not be cached at all. diff --git a/packages/event-bus-redis/src/services/event-bus-redis.ts b/packages/event-bus-redis/src/services/event-bus-redis.ts index a3e98c9b94..270349dfb1 100644 --- a/packages/event-bus-redis/src/services/event-bus-redis.ts +++ b/packages/event-bus-redis/src/services/event-bus-redis.ts @@ -20,8 +20,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService protected readonly moduleOptions_: EventBusRedisModuleOptions // eslint-disable-next-line max-len protected readonly moduleDeclaration_: InternalModuleDeclaration + protected readonly eventBusRedisConnection_: Redis protected queue_: Queue + protected bullWorker_: Worker constructor( { logger, eventBusRedisConnection }: InjectedDependencies, @@ -32,6 +34,8 @@ export default class RedisEventBusService extends AbstractEventBusModuleService // eslint-disable-next-line prefer-rest-params super(...arguments) + this.eventBusRedisConnection_ = eventBusRedisConnection + this.moduleOptions_ = moduleOptions this.logger_ = logger @@ -44,14 +48,26 @@ export default class RedisEventBusService extends AbstractEventBusModuleService // Register our worker to handle emit calls const shouldStartWorker = moduleDeclaration.worker_mode !== "server" if (shouldStartWorker) { - new Worker(moduleOptions.queueName ?? "events-queue", this.worker_, { - prefix: `${this.constructor.name}`, - ...(moduleOptions.workerOptions ?? {}), - connection: eventBusRedisConnection, - }) + this.bullWorker_ = new Worker( + moduleOptions.queueName ?? "events-queue", + this.worker_, + { + prefix: `${this.constructor.name}`, + ...(moduleOptions.workerOptions ?? {}), + connection: eventBusRedisConnection, + } + ) } } + __hooks = { + onApplicationShutdown: async () => { + await this.bullWorker_.close(true) + await this.queue_.close() + this.eventBusRedisConnection_.disconnect() + }, + } + /** * Emit a single event * @param {string} eventName - the name of the event to be process. diff --git a/packages/medusa-test-utils/src/init-modules.ts b/packages/medusa-test-utils/src/init-modules.ts index 4e4c4ad02b..e7f0e73756 100644 --- a/packages/medusa-test-utils/src/init-modules.ts +++ b/packages/medusa-test-utils/src/init-modules.ts @@ -4,7 +4,11 @@ import { MedusaModuleConfig, ModuleJoinerConfig, } from "@medusajs/modules-sdk" -import { ContainerRegistrationKeys, ModulesSdkUtils } from "@medusajs/utils" +import { + ContainerRegistrationKeys, + ModulesSdkUtils, + promiseAll, +} from "@medusajs/utils" export interface InitModulesOptions { injectedDependencies?: Record @@ -48,8 +52,11 @@ export async function initModules({ async function shutdown() { if (shouldDestroyConnectionAutomatically) { - await (sharedPgConnection as any).context?.destroy() - await (sharedPgConnection as any).destroy() + await promiseAll([ + (sharedPgConnection as any).context?.destroy(), + (sharedPgConnection as any).destroy(), + medusaApp.onApplicationShutdown(), + ]) } else { if (!preventConnectionDestroyWarning) { console.info( diff --git a/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.js b/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.js index 09029ebef8..552acb4dc1 100644 --- a/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.js +++ b/packages/medusa-test-utils/src/medusa-test-runner-utils/bootstrap-app.js @@ -1,7 +1,8 @@ const path = require("path") const express = require("express") const getPort = require("get-port") -const { isObject } = require("@medusajs/utils") +const { isObject, promiseAll } = require("@medusajs/utils") +const { GracefulShutdownServer } = require("medusa-core-utils") async function bootstrapApp({ cwd, env = {} } = {}) { const app = express() @@ -12,20 +13,17 @@ async function bootstrapApp({ cwd, env = {} } = {}) { const loaders = require("@medusajs/medusa/dist/loaders").default - const { container, dbConnection, pgConnection, disposeResources } = - await loaders({ - directory: path.resolve(cwd || process.cwd()), - expressApp: app, - isTest: false, - }) + const { container, shutdown } = await loaders({ + directory: path.resolve(cwd || process.cwd()), + expressApp: app, + isTest: false, + }) const PORT = await getPort() return { - disposeResources, + shutdown, container, - db: dbConnection, - pgConnection, app, port: PORT, } @@ -37,10 +35,16 @@ module.exports = { env = {}, skipExpressListen = false, } = {}) => { - const { app, port, container, db, pgConnection } = await bootstrapApp({ + const { + app, + port, + container, + shutdown: medusaShutdown, + } = await bootstrapApp({ cwd, env, }) + let expressServer if (skipExpressListen) { @@ -48,13 +52,7 @@ module.exports = { } const shutdown = async () => { - await Promise.all([ - container.dispose(), - expressServer.close(), - db?.destroy(), - pgConnection?.context?.destroy(), - container.dispose(), - ]) + await promiseAll([expressServer.shutdown(), medusaShutdown()]) if (typeof global !== "undefined" && global?.gc) { global.gc() @@ -62,7 +60,7 @@ module.exports = { } return await new Promise((resolve, reject) => { - expressServer = app.listen(port, async (err) => { + const server = app.listen(port, async (err) => { if (err) { await shutdown() return reject(err) @@ -74,6 +72,8 @@ module.exports = { port, }) }) + + expressServer = GracefulShutdownServer.create(server) }) }, } diff --git a/packages/medusa-test-utils/src/medusa-test-runner.ts b/packages/medusa-test-utils/src/medusa-test-runner.ts index bdb4a6462a..5c221e64d3 100644 --- a/packages/medusa-test-utils/src/medusa-test-runner.ts +++ b/packages/medusa-test-utils/src/medusa-test-runner.ts @@ -54,6 +54,7 @@ const dbTestUtilFactory = (): any => ({ shutdown: async function (dbName: string) { await this.db_?.destroy() await this.pgConnection_?.context?.destroy() + await this.pgConnection_?.destroy() return await dropDatabase( { databaseName: dbName, errorIfNonExist: false }, @@ -116,7 +117,7 @@ export function medusaIntegrationTestRunner({ const cwd = process.cwd() - let shutdown: () => Promise + let shutdown = async () => void 0 let dbUtils = dbTestUtilFactory() let container: ContainerLike let apiUtils: any @@ -142,6 +143,8 @@ export function medusaIntegrationTestRunner({ getContainer: () => container, } as MedusaSuiteOptions + let isFirstTime = true + const beforeAll_ = async () => { await dbUtils.create(dbName) const { dbDataSource, pgConnection } = await initDb({ @@ -156,7 +159,7 @@ export function medusaIntegrationTestRunner({ dbUtils.pgConnection_ = pgConnection const { - shutdown: shutdown_, + shutdown: serverShutdown, container: container_, port, } = await startBootstrapApp({ @@ -164,13 +167,26 @@ export function medusaIntegrationTestRunner({ env, }) - apiUtils = axios.create({ baseURL: `http://localhost:${port}` }) + const cancelTokenSource = axios.CancelToken.source() + apiUtils = axios.create({ + baseURL: `http://localhost:${port}`, + cancelToken: cancelTokenSource.token, + }) container = container_ - shutdown = shutdown_ + shutdown = async () => { + await serverShutdown() + cancelTokenSource.cancel("Request canceled by shutdown") + } } const beforeEach_ = async () => { + // The beforeAll already run everything, so lets not re run the loaders for the first iteration + if (isFirstTime) { + isFirstTime = false + return + } + const container = options.getContainer() const copiedContainer = createMedusaContainer({}, container) diff --git a/packages/medusa/src/commands/start-cluster.js b/packages/medusa/src/commands/start-cluster.js index 19c8c0fb30..efd9026257 100644 --- a/packages/medusa/src/commands/start-cluster.js +++ b/packages/medusa/src/commands/start-cluster.js @@ -54,7 +54,10 @@ export default async function ({ port, cpus, directory }) { const app = express() - const { dbConnection } = await loaders({ directory, expressApp: app }) + const { dbConnection, shutdown } = await loaders({ + directory, + expressApp: app, + }) const serverActivity = Logger.activity(`Creating server`) const server = GracefulShutdownServer.create( app.listen(port, (err) => { @@ -70,7 +73,9 @@ export default async function ({ port, cpus, directory }) { server .shutdown() .then(() => { - process.exit(0) + shutdown().then(() => { + process.exit(0) + }) }) .catch((e) => { process.exit(1) diff --git a/packages/medusa/src/commands/start.js b/packages/medusa/src/commands/start.js index 9e421fed45..e3bbda43cc 100644 --- a/packages/medusa/src/commands/start.js +++ b/packages/medusa/src/commands/start.js @@ -19,14 +19,13 @@ export default async function ({ port, directory }) { const app = express() try { - const { dbConnection, configModule, container } = await loaders({ + const { dbConnection, shutdown } = await loaders({ directory, expressApp: app, }) - let server const serverActivity = Logger.activity(`Creating server`) - server = GracefulShutdownServer.create( + const server = GracefulShutdownServer.create( app.listen(port, (err) => { if (err) { return @@ -42,7 +41,9 @@ export default async function ({ port, directory }) { .shutdown() .then(() => { Logger.info("Gracefully stopping the server.") - process.exit(0) + shutdown().then(() => { + process.exit(0) + }) }) .catch((e) => { Logger.error("Error received when shutting down the server.", e) diff --git a/packages/medusa/src/loaders/express.ts b/packages/medusa/src/loaders/express.ts index cd68d6ea94..b5feb0adac 100644 --- a/packages/medusa/src/loaders/express.ts +++ b/packages/medusa/src/loaders/express.ts @@ -11,7 +11,10 @@ type Options = { configModule: ConfigModule } -export default async ({ app, configModule }: Options): Promise => { +export default async ({ app, configModule }: Options): Promise<{ + app: Express, + shutdown: () => Promise +}> => { let sameSite: string | boolean = false let secure = false if ( @@ -38,9 +41,11 @@ export default async ({ app, configModule }: Options): Promise => { store: null, } + let redisClient + if (configModule?.projectConfig?.redis_url) { const RedisStore = createStore(session) - const redisClient = new Redis( + redisClient = new Redis( configModule.projectConfig.redis_url, configModule.projectConfig.redis_options ?? {} ) @@ -63,5 +68,9 @@ export default async ({ app, configModule }: Options): Promise => { res.status(200).send("OK") }) - return app + const shutdown = async () => { + redisClient?.disconnect() + } + + return { app, shutdown } } diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index bbc0003650..e7a0f0779b 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -6,8 +6,8 @@ import { import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types" import { ContainerRegistrationKeys, - MedusaV2Flag, isString, + MedusaV2Flag, promiseAll, } from "@medusajs/utils" import { asValue } from "awilix" import { Express, NextFunction, Request, Response } from "express" @@ -97,18 +97,21 @@ async function loadMedusaV2({ container.register({ [ContainerRegistrationKeys.LOGGER]: asValue(Logger), - featureFlagRouter: asValue(featureFlagRouter), + [ContainerRegistrationKeys.FEATURE_FLAG_ROUTER]: asValue(featureFlagRouter), [ContainerRegistrationKeys.CONFIG_MODULE]: asValue(configModule), - ["remoteQuery"]: asValue(null), + [ContainerRegistrationKeys.REMOTE_QUERY]: asValue(null), }) - await loadMedusaApp({ + const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({ configModule, container, }) + let expressShutdown = async () => {} + if (shouldStartAPI) { - await expressLoader({ app: expressApp, configModule }) + const { shutdown } = await expressLoader({ app: expressApp, configModule }) + expressShutdown = shutdown expressApp.use((req: Request, res: Response, next: NextFunction) => { req.scope = container.createScope() as MedusaContainer @@ -145,11 +148,21 @@ async function loadMedusaV2({ await createDefaultsWorkflow(container).run() + const shutdown = async () => { + await promiseAll([ + container.dispose(), + pgConnection?.context?.destroy(), + expressShutdown(), + medusaAppOnApplicationShutdown() + ]) + } + return { configModule, container, app: expressApp, pgConnection, + shutdown, } } @@ -163,6 +176,7 @@ export default async ({ dbConnection?: Connection app: Express pgConnection: unknown + shutdown: () => Promise }> => { const configModule = loadConfig(rootDirectory) const featureFlagRouter = featureFlagsLoader(configModule, Logger) @@ -197,7 +211,7 @@ export default async ({ featureFlagRouter: asValue(featureFlagRouter), }) - await redisLoader({ container, configModule, logger: Logger }) + const { shutdown: redisShutdown } = await redisLoader({ container, configModule, logger: Logger }) const modelsActivity = Logger.activity(`Initializing models${EOL}`) track("MODELS_INIT_STARTED") @@ -257,7 +271,7 @@ export default async ({ track("MODULES_INIT_STARTED") // Move before services init once all modules are migrated and do not rely on core resources anymore - await loadMedusaApp({ + const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({ configModule, container, }) @@ -267,7 +281,7 @@ export default async ({ const expActivity = Logger.activity(`Initializing express${EOL}`) track("EXPRESS_INIT_STARTED") - await expressLoader({ app: expressApp, configModule }) + const { shutdown: expressShutdown } = await expressLoader({ app: expressApp, configModule }) await passportLoader({ app: expressApp, configModule }) const exAct = Logger.success(expActivity, "Express intialized") || {} track("EXPRESS_INIT_COMPLETED", { duration: exAct.duration }) @@ -324,11 +338,23 @@ export default async ({ Logger.success(searchActivity, "Indexing event emitted") || {} track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration }) + async function shutdown() { + await promiseAll([ + container.dispose(), + dbConnection?.destroy(), + pgConnection?.context?.destroy(), + redisShutdown(), + expressShutdown(), + medusaAppOnApplicationShutdown(), + ]) + } + return { configModule, container, dbConnection, app: expressApp, pgConnection, + shutdown, } } diff --git a/packages/medusa/src/loaders/medusa-app.ts b/packages/medusa/src/loaders/medusa-app.ts index 45df6f0f71..e1311b3270 100644 --- a/packages/medusa/src/loaders/medusa-app.ts +++ b/packages/medusa/src/loaders/medusa-app.ts @@ -249,7 +249,6 @@ export async function runModulesLoader({ } container: MedusaContainer }): Promise { - const featureFlagRouter = container.resolve("featureFlagRouter") const injectedDependencies = { [ContainerRegistrationKeys.PG_CONNECTION]: container.resolve( ContainerRegistrationKeys.PG_CONNECTION diff --git a/packages/medusa/src/loaders/redis.ts b/packages/medusa/src/loaders/redis.ts index 45d12c46b2..5d89aad40d 100644 --- a/packages/medusa/src/loaders/redis.ts +++ b/packages/medusa/src/loaders/redis.ts @@ -15,23 +15,25 @@ async function redisLoader({ container, configModule, logger, -}: Options): Promise { +}: Options): Promise<{ shutdown: () => Promise }> { + let client!: Redis | FakeRedis + if (configModule.projectConfig.redis_url) { - const redisClient = new Redis(configModule.projectConfig.redis_url, { + client = new Redis(configModule.projectConfig.redis_url, { // Lazy connect to properly handle connection errors lazyConnect: true, ...(configModule.projectConfig.redis_options ?? {}), }) try { - await redisClient.connect() + await client.connect() logger?.info(`Connection to Redis established`) } catch (err) { logger?.error(`An error occurred while connecting to Redis:${EOL} ${err}`) } container.register({ - redisClient: asValue(redisClient), + redisClient: asValue(client), }) } else { if (process.env.NODE_ENV === "production") { @@ -43,12 +45,18 @@ async function redisLoader({ logger.info("Using fake Redis") // Economical way of dealing with redis clients - const client = new FakeRedis() + client = new FakeRedis() container.register({ redisClient: asValue(client), }) } + + return { + shutdown: async () => { + client.disconnect() + }, + } } export default redisLoader diff --git a/packages/modules-sdk/src/loaders/utils/load-internal.ts b/packages/modules-sdk/src/loaders/utils/load-internal.ts index bbe11a5122..2c4db5cfc3 100644 --- a/packages/modules-sdk/src/loaders/utils/load-internal.ts +++ b/packages/modules-sdk/src/loaders/utils/load-internal.ts @@ -20,7 +20,9 @@ export async function loadInternalModule( migrationOnly?: boolean, loaderOnly?: boolean ): Promise<{ error?: Error } | void> { - const registrationName = resolution.definition.registrationName + const registrationName = !loaderOnly + ? resolution.definition.registrationName + : resolution.definition.registrationName + "__loaderOnly" const { resources } = resolution.moduleDeclaration as InternalModuleDeclaration @@ -122,11 +124,8 @@ export async function loadInternalModule( } } - if (loaderOnly) { - return - } - const moduleService = loadedModule.service + container.register({ [registrationName]: asFunction((cradle) => { ;(moduleService as any).__type = MedusaModuleType @@ -137,6 +136,12 @@ export async function loadInternalModule( ) }).singleton(), }) + + if (loaderOnly) { + // The expectation is only to run the loader as standalone, so we do not need to register the service and we need to cleanup all services + const service = container.resolve(registrationName) + await service.__hooks?.onApplicationShutdown() + } } export async function loadModuleMigrations( diff --git a/packages/modules-sdk/src/medusa-app.ts b/packages/modules-sdk/src/medusa-app.ts index b01f135e85..80baeb7b32 100644 --- a/packages/modules-sdk/src/medusa-app.ts +++ b/packages/modules-sdk/src/medusa-app.ts @@ -22,6 +22,7 @@ import { isObject, isString, ModulesSdkUtils, + promiseAll, } from "@medusajs/utils" import { asValue } from "awilix" import { @@ -202,6 +203,7 @@ export type MedusaAppOutput = { entitiesMap?: Record notFound?: Record> runMigrations: RunMigrationFn + onApplicationShutdown: () => Promise } export type MedusaAppOptions = { @@ -237,19 +239,16 @@ async function MedusaApp_({ migrationOnly = false, loaderOnly = false, workerMode = "server", -}: MedusaAppOptions & { migrationOnly?: boolean } = {}): Promise<{ - modules: Record - link: RemoteLink | undefined - query: ( - query: string | RemoteJoinerQuery | object, - variables?: Record, - options?: RemoteJoinerOptions - ) => Promise - entitiesMap?: Record - notFound?: Record> - runMigrations: RunMigrationFn -}> { +}: MedusaAppOptions & { + migrationOnly?: boolean +} = {}): Promise { const sharedContainer_ = createMedusaContainer({}, sharedContainer) + const onApplicationShutdown = async () => { + await promiseAll([ + MedusaModule.onApplicationShutdown(), + sharedContainer_.dispose(), + ]) + } const modules: MedusaModuleConfig = modulesConfig ?? @@ -312,6 +311,7 @@ async function MedusaApp_({ if (loaderOnly) { return { + onApplicationShutdown, modules: allModules, link: undefined, query: async () => { @@ -387,6 +387,7 @@ async function MedusaApp_({ } return { + onApplicationShutdown, modules: allModules, link: remoteLink, query, diff --git a/packages/modules-sdk/src/medusa-module.ts b/packages/modules-sdk/src/medusa-module.ts index dabe000523..59237a65f9 100644 --- a/packages/modules-sdk/src/medusa-module.ts +++ b/packages/modules-sdk/src/medusa-module.ts @@ -15,6 +15,7 @@ import { } from "@medusajs/types" import { createMedusaContainer, + promiseAll, simpleHash, stringifyCircular, } from "@medusajs/utils" @@ -116,6 +117,20 @@ export class MedusaModule { } } } + public static async onApplicationShutdown(): Promise { + await promiseAll( + [...MedusaModule.instances_.values()].map(instances => { + return Object.values(instances).map((instance: IModuleService) => { + return instance.__hooks?.onApplicationShutdown + ?.bind(instance)() + .catch(() => { + // The module should handle this and log it + return void 0 + }) + }) + }).flat() + ) + } public static clearInstances(): void { MedusaModule.instances_.clear() diff --git a/packages/types/src/modules-sdk/index.ts b/packages/types/src/modules-sdk/index.ts index 4aaf939a71..7ffc938e61 100644 --- a/packages/types/src/modules-sdk/index.ts +++ b/packages/types/src/modules-sdk/index.ts @@ -294,5 +294,6 @@ export interface IModuleService { */ __hooks?: { onApplicationStart?: () => Promise + onApplicationShutdown?: () => Promise } } diff --git a/packages/workflow-engine-redis/src/loaders/redis.ts b/packages/workflow-engine-redis/src/loaders/redis.ts index 8321a6d147..51c0cc3a66 100644 --- a/packages/workflow-engine-redis/src/loaders/redis.ts +++ b/packages/workflow-engine-redis/src/loaders/redis.ts @@ -63,6 +63,12 @@ export default async ({ redisPublisher: asValue(redisPublisher), redisSubscriber: asValue(redisSubscriber), redisQueueName: asValue(queueName), + redisDisconnectHandler: asValue(async () => { + connection.disconnect() + workerConnection.disconnect() + redisPublisher.disconnect() + redisSubscriber.disconnect() + }), }) } diff --git a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts index 8df4e3be02..40d12b1bb1 100644 --- a/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -5,7 +5,7 @@ import { TransactionStep, } from "@medusajs/orchestration" import { ContainerLike, Context, MedusaContainer } from "@medusajs/types" -import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" +import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils" import { FlowRunOptions, MedusaWorkflow, @@ -76,6 +76,8 @@ export class WorkflowOrchestratorService { protected redisSubscriber: Redis private subscribers: Subscribers = new Map() + protected redisDistributedTransactionStorage_: RedisDistributedTransactionStorage + constructor({ redisDistributedTransactionStorage, redisPublisher, @@ -92,6 +94,9 @@ export class WorkflowOrchestratorService { redisDistributedTransactionStorage.setWorkflowOrchestratorService(this) DistributedTransaction.setStorage(redisDistributedTransactionStorage) + this.redisDistributedTransactionStorage_ = + redisDistributedTransactionStorage + this.redisSubscriber.on("message", async (_, message) => { const { instanceId, data } = JSON.parse(message) @@ -99,6 +104,10 @@ export class WorkflowOrchestratorService { }) } + async onApplicationShutdown() { + await this.redisDistributedTransactionStorage_.onApplicationShutdown() + } + @InjectSharedContext() async run( workflowIdOrWorkflow: string | ReturnWorkflow, diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index 8701e6d26d..d0fb245f83 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -9,9 +9,9 @@ import { import { InjectManager, InjectSharedContext, + isString, MedusaContext, MedusaError, - isString, } from "@medusajs/utils" import type { IWorkflowEngineService, @@ -26,30 +26,41 @@ type InjectedDependencies = { baseRepository: DAL.RepositoryService workflowExecutionService: ModulesSdkTypes.InternalModuleService workflowOrchestratorService: WorkflowOrchestratorService + redisDisconnectHandler: () => Promise } export class WorkflowsModuleService implements IWorkflowEngineService { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService protected workflowOrchestratorService_: WorkflowOrchestratorService + protected redisDisconnectHandler_: () => Promise constructor( { baseRepository, workflowExecutionService, workflowOrchestratorService, + redisDisconnectHandler, }: InjectedDependencies, protected readonly moduleDeclaration: InternalModuleDeclaration ) { this.baseRepository_ = baseRepository this.workflowExecutionService_ = workflowExecutionService this.workflowOrchestratorService_ = workflowOrchestratorService + this.redisDisconnectHandler_ = redisDisconnectHandler } __joinerConfig(): ModuleJoinerConfig { return joinerConfig } + __hooks = { + onApplicationShutdown: async () => { + await this.workflowOrchestratorService_.onApplicationShutdown() + await this.redisDisconnectHandler_() + }, + } + @InjectManager("baseRepository_") async retrieveWorkflowExecution( idOrObject: @@ -191,7 +202,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService { transactionId: string, @MedusaContext() context: Context = {} ) { - return this.workflowOrchestratorService_.getRunningTransaction( + return await this.workflowOrchestratorService_.getRunningTransaction( workflowId, transactionId, context @@ -211,7 +222,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService { }, @MedusaContext() context: Context = {} ) { - return this.workflowOrchestratorService_.setStepSuccess( + return await this.workflowOrchestratorService_.setStepSuccess( { idempotencyKey, stepResponse, @@ -234,7 +245,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService { }, @MedusaContext() context: Context = {} ) { - return this.workflowOrchestratorService_.setStepFailure( + return await this.workflowOrchestratorService_.setStepFailure( { idempotencyKey, stepResponse, diff --git a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 821703e396..6717f0bd1e 100644 --- a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -64,6 +64,11 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt ) } + async onApplicationShutdown() { + await this.queue.close() + await this.worker.close(true) + } + setWorkflowOrchestratorService(workflowOrchestratorService) { this.workflowOrchestratorService_ = workflowOrchestratorService }