diff --git a/.changeset/fuzzy-stingrays-battle.md b/.changeset/fuzzy-stingrays-battle.md new file mode 100644 index 0000000000..dda19bede9 --- /dev/null +++ b/.changeset/fuzzy-stingrays-battle.md @@ -0,0 +1,9 @@ +--- +"@medusajs/medusa": patch +"@medusajs/event-bus-local": patch +"@medusajs/framework": patch +"@medusajs/utils": patch +"@medusajs/workflows-sdk": patch +--- + +chore: Backend HMR (expriemental) diff --git a/packages/core/framework/package.json b/packages/core/framework/package.json index b5dbf7d70b..538d7aa353 100644 --- a/packages/core/framework/package.json +++ b/packages/core/framework/package.json @@ -99,8 +99,7 @@ "@aws-sdk/client-dynamodb": "^3.218.0", "@medusajs/cli": "2.12.1", "connect-dynamodb": "^3.0.5", - "ioredis": "^5.4.1", - "vite": "^5.4.21" + "ioredis": "^5.4.1" }, "peerDependenciesMeta": { "@aws-sdk/client-dynamodb": { diff --git a/packages/core/framework/src/http/router.ts b/packages/core/framework/src/http/router.ts index 7176edafa0..0037539b2e 100644 --- a/packages/core/framework/src/http/router.ts +++ b/packages/core/framework/src/http/router.ts @@ -1,6 +1,11 @@ -import { ContainerRegistrationKeys, parseCorsOrigins } from "@medusajs/utils" +import { ContainerRegistrationKeys, parseCorsOrigins, FeatureFlag } from "@medusajs/utils" import cors, { CorsOptions } from "cors" -import type { ErrorRequestHandler, Express, RequestHandler } from "express" +import type { + ErrorRequestHandler, + Express, + IRouter, + RequestHandler, +} from "express" import type { AdditionalDataValidatorRoute, BodyParserConfigRoute, @@ -83,6 +88,7 @@ export class ApiLoader { */ async #loadHttpResources() { const routesLoader = new RoutesLoader() + const middlewareLoader = new MiddlewareFileLoader() for (const dir of this.#sourceDirs) { @@ -119,6 +125,7 @@ export class ApiLoader { : route.handler this.#app[route.method.toLowerCase()](route.matcher, wrapHandler(handler)) + return } @@ -354,6 +361,10 @@ export class ApiLoader { } async load() { + if (FeatureFlag.isFeatureEnabled("backend_hmr")) { + ;(global as any).__MEDUSA_HMR_API_LOADER__ = this + } + const { errorHandler: sourceErrorHandler, middlewares, @@ -462,4 +473,19 @@ export class ApiLoader { */ this.#app.use(sourceErrorHandler ?? errorHandler()) } + + /** + * Clear all API resources registered by this loader + * This removes all routes and middleware added after the initial stack state + * Used by HMR to reset the API state before reloading + */ + clearAllResources() { + const router = this.#app._router as IRouter + const initialStackLength = + (global as any).__MEDUSA_HMR_INITIAL_STACK_LENGTH__ ?? 0 + + if (router && router.stack) { + router.stack.splice(initialStackLength) + } + } } diff --git a/packages/core/framework/src/http/routes-loader.ts b/packages/core/framework/src/http/routes-loader.ts index bbc072f256..638025cd31 100644 --- a/packages/core/framework/src/http/routes-loader.ts +++ b/packages/core/framework/src/http/routes-loader.ts @@ -54,7 +54,7 @@ export class RoutesLoader { /** * Creates the route path from its relative file path. */ - #createRoutePath(relativePath: string): string { + createRoutePath(relativePath: string): string { const segments = relativePath.replace(/route(\.js|\.ts)$/, "").split(sep) const params: Record = {} @@ -186,7 +186,7 @@ export class RoutesLoader { .map(async (entry) => { const absolutePath = join(entry.path, entry.name) const relativePath = absolutePath.replace(sourceDir, "") - const route = this.#createRoutePath(relativePath) + const route = this.createRoutePath(relativePath) const routes = await this.#getRoutesForFile(route, absolutePath) routes.forEach((routeConfig) => { @@ -233,4 +233,32 @@ export class RoutesLoader { [] ) } + + /** + * Reload a single route file + * This is used by HMR to reload routes when files change + */ + async reloadRouteFile( + absolutePath: string, + sourceDir: string + ): Promise { + const relativePath = absolutePath.replace(sourceDir, "") + const route = this.createRoutePath(relativePath) + const routes = await this.#getRoutesForFile(route, absolutePath) + + // Register the new routes (will overwrite existing) + routes.forEach((routeConfig) => { + this.registerRoute({ + absolutePath, + relativePath, + ...routeConfig, + }) + }) + + return routes.map((routeConfig) => ({ + absolutePath, + relativePath, + ...routeConfig, + })) + } } diff --git a/packages/core/framework/src/jobs/job-loader.ts b/packages/core/framework/src/jobs/job-loader.ts index 20b79a4d21..d59b5a61bb 100644 --- a/packages/core/framework/src/jobs/job-loader.ts +++ b/packages/core/framework/src/jobs/job-loader.ts @@ -1,6 +1,12 @@ import type { SchedulerOptions } from "@medusajs/orchestration" import { MedusaContainer } from "@medusajs/types" -import { isFileSkipped, isObject, MedusaError } from "@medusajs/utils" +import { + dynamicImport, + isFileSkipped, + isObject, + MedusaError, + registerDevServerResource, +} from "@medusajs/utils" import { createStep, createWorkflow, @@ -23,6 +29,11 @@ export class JobLoader extends ResourceLoader { super(sourceDir, container) } + async loadFile(path: string) { + const exports = await dynamicImport(path) + await this.onFileLoaded(path, exports) + } + protected async onFileLoaded( path: string, fileExports: { @@ -37,6 +48,7 @@ export class JobLoader extends ResourceLoader { this.validateConfig(fileExports.config) this.logger.debug(`Registering job from ${path}.`) this.register({ + path, config: fileExports.config, handler: fileExports.default, }) @@ -80,9 +92,11 @@ export class JobLoader extends ResourceLoader { * @protected */ protected register({ + path, config, handler, }: { + path: string config: CronJobConfig handler: CronJobHandler }) { @@ -116,6 +130,13 @@ export class JobLoader extends ResourceLoader { createWorkflow(workflowConfig, () => { step() }) + + registerDevServerResource({ + sourcePath: path, + id: workflowName, + type: "job", + config: config, + }) } /** diff --git a/packages/core/framework/src/medusa-app-loader.ts b/packages/core/framework/src/medusa-app-loader.ts index 82ad09597e..9e28397708 100644 --- a/packages/core/framework/src/medusa-app-loader.ts +++ b/packages/core/framework/src/medusa-app-loader.ts @@ -5,6 +5,7 @@ import { MedusaAppMigrateGenerate, MedusaAppMigrateUp, MedusaAppOutput, + MedusaModule, ModulesDefinition, RegisterModuleJoinerConfig, } from "@medusajs/modules-sdk" @@ -12,6 +13,7 @@ import { CommonTypes, ConfigModule, ILinkMigrationsPlanner, + IModuleService, InternalModuleDeclaration, LoadedModule, ModuleDefinition, @@ -235,6 +237,76 @@ export class MedusaAppLoader { }) } + /** + * Reload a single module by its key + * @param moduleKey - The key of the module to reload (e.g., 'contactUsModuleService') + */ + async reloadSingleModule({ + moduleKey, + serviceName, + }: { + /** + * the key of the module to reload in the medusa config (either infered or specified) + */ + moduleKey: string + /** + * Registration name of the service to reload in the container + */ + serviceName: string + }): Promise { + const configModule: ConfigModule = this.#container.resolve( + ContainerRegistrationKeys.CONFIG_MODULE + ) + MedusaModule.unregisterModuleResolution(moduleKey) + if (serviceName) { + this.#container.cache.delete(serviceName) + } + + const moduleConfig = configModule.modules?.[moduleKey] + if (!moduleConfig) { + return null + } + + const { sharedResourcesConfig, injectedDependencies } = + this.prepareSharedResourcesAndDeps() + + const mergedModules = this.mergeDefaultModules({ + [moduleKey]: moduleConfig, + }) + const moduleDefinition = mergedModules[moduleKey] + + const result = await MedusaApp({ + modulesConfig: { [moduleKey]: moduleDefinition }, + sharedContainer: this.#container, + linkModules: this.#customLinksModules, + sharedResourcesConfig, + injectedDependencies, + workerMode: configModule.projectConfig?.workerMode, + medusaConfigPath: this.#medusaConfigPath, + cwd: this.#cwd, + }) + + const loadedModule = result.modules[moduleKey] as LoadedModule & + IModuleService + if (loadedModule) { + this.#container.register({ + [loadedModule.__definition.key]: asValue(loadedModule), + }) + } + + if (loadedModule?.__hooks?.onApplicationStart) { + await loadedModule.__hooks.onApplicationStart + .bind(loadedModule)() + .catch((error: any) => { + injectedDependencies[ContainerRegistrationKeys.LOGGER].error( + `Error starting module "${loadedModule.__definition.key}": ${error.message}` + ) + }) + } + + return loadedModule + } + /** * Load all modules and bootstrap all the modules and links to be ready to be consumed * @param config diff --git a/packages/core/framework/src/subscribers/subscriber-loader.ts b/packages/core/framework/src/subscribers/subscriber-loader.ts index daa18fd8ed..e3f06149f4 100644 --- a/packages/core/framework/src/subscribers/subscriber-loader.ts +++ b/packages/core/framework/src/subscribers/subscriber-loader.ts @@ -4,7 +4,12 @@ import { MedusaContainer, Subscriber, } from "@medusajs/types" -import { isFileSkipped, kebabCase, Modules } from "@medusajs/utils" +import { + isFileSkipped, + kebabCase, + Modules, + registerDevServerResource, +} from "@medusajs/utils" import { parse } from "path" import { configManager } from "../config" import { container } from "../container" @@ -154,7 +159,7 @@ export class SubscriberLoader extends ResourceLoader { return kebabCase(idFromFile) } - private createSubscriber({ + createSubscriber({ fileName, config, handler, @@ -186,6 +191,14 @@ export class SubscriberLoader extends ResourceLoader { ...config.context, subscriberId, }) + + registerDevServerResource({ + type: "subscriber", + id: subscriberId, + sourcePath: fileName, + subscriberId, + events, + }) } } diff --git a/packages/core/modules-sdk/src/medusa-module.ts b/packages/core/modules-sdk/src/medusa-module.ts index 65e9fffec6..abdd2147a1 100644 --- a/packages/core/modules-sdk/src/medusa-module.ts +++ b/packages/core/modules-sdk/src/medusa-module.ts @@ -203,6 +203,20 @@ class MedusaModule { return [...MedusaModule.moduleResolutions_.values()] } + public static unregisterModuleResolution(moduleKey: string): void { + MedusaModule.moduleResolutions_.delete(moduleKey) + MedusaModule.joinerConfig_.delete(moduleKey) + const moduleAliases = MedusaModule.modules_ + .get(moduleKey) + ?.map((m) => m.alias || m.hash) + if (moduleAliases) { + for (const alias of moduleAliases) { + MedusaModule.instances_.delete(alias) + } + } + MedusaModule.modules_.delete(moduleKey) + } + public static setModuleResolution( moduleKey: string, resolution: ModuleResolution @@ -516,25 +530,27 @@ class MedusaModule { } const resolvedServices = await promiseAll( - loadedModules.map(async ({ - hashKey, - modDeclaration, - moduleResolutions, - container, - finishLoading, - }) => { - const service = await MedusaModule.resolveLoadedModule({ + loadedModules.map( + async ({ hashKey, modDeclaration, moduleResolutions, container, - }) + finishLoading, + }) => { + const service = await MedusaModule.resolveLoadedModule({ + hashKey, + modDeclaration, + moduleResolutions, + container, + }) - MedusaModule.instances_.set(hashKey, service) - finishLoading(service) - MedusaModule.loading_.delete(hashKey) - return service - }) + MedusaModule.instances_.set(hashKey, service) + finishLoading(service) + MedusaModule.loading_.delete(hashKey) + return service + } + ) ) services.push(...resolvedServices) @@ -590,7 +606,10 @@ class MedusaModule { try { // TODO: rework that to store on a separate property - joinerConfig = await services[keyName].__joinerConfig?.() + joinerConfig = + typeof services[keyName].__joinerConfig === "function" + ? await services[keyName].__joinerConfig?.() + : services[keyName].__joinerConfig } catch { // noop } diff --git a/packages/core/utils/src/bundles.ts b/packages/core/utils/src/bundles.ts index 6e7d41ad29..d26d9748f9 100644 --- a/packages/core/utils/src/bundles.ts +++ b/packages/core/utils/src/bundles.ts @@ -19,3 +19,4 @@ export * as SearchUtils from "./search" export * as ShippingProfileUtils from "./shipping" export * as UserUtils from "./user" export * as CachingUtils from "./caching" +export * as DevServerUtils from "./dev-server" diff --git a/packages/core/utils/src/dev-server/handlers/job-handler.ts b/packages/core/utils/src/dev-server/handlers/job-handler.ts new file mode 100644 index 0000000000..37edffb9b3 --- /dev/null +++ b/packages/core/utils/src/dev-server/handlers/job-handler.ts @@ -0,0 +1,44 @@ +import { JobResourceData, ResourceEntry, ResourceTypeHandler } from "../types" + +export class JobHandler implements ResourceTypeHandler { + readonly type = "job" + + validate(data: JobResourceData): void { + if (!data.id) { + throw new Error( + `Job registration requires id. Received: ${JSON.stringify(data)}` + ) + } + + if (!data.sourcePath) { + throw new Error( + `Job registration requires sourcePath. Received: ${JSON.stringify( + data + )}` + ) + } + + if (!data.config?.name) { + throw new Error( + `Job registration requires config.name. Received: ${JSON.stringify( + data + )}` + ) + } + } + + resolveSourcePath(data: JobResourceData): string { + return data.sourcePath + } + + createEntry(data: JobResourceData): ResourceEntry { + return { + id: data.id, + config: data.config, + } + } + + getInverseKey(data: JobResourceData): string { + return `${this.type}:${data.config?.name}` + } +} diff --git a/packages/core/utils/src/dev-server/handlers/step-handler.ts b/packages/core/utils/src/dev-server/handlers/step-handler.ts new file mode 100644 index 0000000000..ff88632d24 --- /dev/null +++ b/packages/core/utils/src/dev-server/handlers/step-handler.ts @@ -0,0 +1,52 @@ +import { ResourceEntry, ResourceTypeHandler, StepResourceData } from "../types" + +export class StepHandler implements ResourceTypeHandler { + readonly type = "step" + + constructor(private inverseRegistry: Map) {} + + validate(data: StepResourceData): void { + if (!data.id) { + throw new Error( + `Step registration requires id. Received: ${JSON.stringify(data)}` + ) + } + + if (!data.sourcePath && !data.workflowId) { + throw new Error( + `Step registration requires either sourcePath or workflowId. Received: ${JSON.stringify( + data + )}` + ) + } + } + + resolveSourcePath(data: StepResourceData): string { + if (data.sourcePath) { + return data.sourcePath + } + + // Look up workflow's source path + const workflowKey = `workflow:${data.workflowId}` + const workflowSourcePaths = this.inverseRegistry.get(workflowKey) + + if (!workflowSourcePaths || workflowSourcePaths.length === 0) { + throw new Error( + `step workflow not found: ${data.workflowId} for step ${data.id}` + ) + } + + return workflowSourcePaths[0] + } + + createEntry(data: StepResourceData): ResourceEntry { + return { + id: data.id, + workflowId: data.workflowId, + } + } + + getInverseKey(data: StepResourceData): string { + return `${this.type}:${data.id}` + } +} diff --git a/packages/core/utils/src/dev-server/handlers/subscriber-handler.ts b/packages/core/utils/src/dev-server/handlers/subscriber-handler.ts new file mode 100644 index 0000000000..dd3ac71a6a --- /dev/null +++ b/packages/core/utils/src/dev-server/handlers/subscriber-handler.ts @@ -0,0 +1,67 @@ +import { + ResourceEntry, + ResourceTypeHandler, + SubscriberResourceData, +} from "../types" + +export class SubscriberHandler + implements ResourceTypeHandler +{ + readonly type = "subscriber" + + validate(data: SubscriberResourceData): void { + if (!data.id) { + throw new Error( + `Subscriber registration requires id. Received: ${JSON.stringify(data)}` + ) + } + + if (!data.sourcePath) { + throw new Error( + `Subscriber registration requires sourcePath. Received: ${JSON.stringify( + data + )}` + ) + } + + if (!data.subscriberId) { + throw new Error( + `Subscriber registration requires subscriberId. Received: ${JSON.stringify( + data + )}` + ) + } + + if (!data.events) { + throw new Error( + `Subscriber registration requires events. Received: ${JSON.stringify( + data + )}` + ) + } + + if (!Array.isArray(data.events)) { + throw new Error( + `Subscriber registration requires events to be an array. Received: ${JSON.stringify( + data + )}` + ) + } + } + + resolveSourcePath(data: SubscriberResourceData): string { + return data.sourcePath + } + + createEntry(data: SubscriberResourceData): ResourceEntry { + return { + id: data.id, + subscriberId: data.subscriberId, + events: data.events, + } + } + + getInverseKey(data: SubscriberResourceData): string { + return `${this.type}:${data.subscriberId}` + } +} diff --git a/packages/core/utils/src/dev-server/handlers/workflow-handler.ts b/packages/core/utils/src/dev-server/handlers/workflow-handler.ts new file mode 100644 index 0000000000..4bc60254df --- /dev/null +++ b/packages/core/utils/src/dev-server/handlers/workflow-handler.ts @@ -0,0 +1,42 @@ +import { + ResourceEntry, + ResourceTypeHandler, + WorkflowResourceData, +} from "../types" + +export class WorkflowHandler + implements ResourceTypeHandler +{ + readonly type = "workflow" + + validate(data: WorkflowResourceData): void { + if (!data.sourcePath) { + throw new Error( + `Workflow registration requires sourcePath. Received: ${JSON.stringify( + data + )}` + ) + } + + if (!data.id) { + throw new Error( + `Workflow registration requires id. Received: ${JSON.stringify(data)}` + ) + } + } + + resolveSourcePath(data: WorkflowResourceData): string { + return data.sourcePath + } + + createEntry(data: WorkflowResourceData): ResourceEntry { + return { + id: data.id, + workflowId: data.id, + } + } + + getInverseKey(data: WorkflowResourceData): string { + return `${this.type}:${data.id}` + } +} diff --git a/packages/core/utils/src/dev-server/index.ts b/packages/core/utils/src/dev-server/index.ts new file mode 100644 index 0000000000..9a11166863 --- /dev/null +++ b/packages/core/utils/src/dev-server/index.ts @@ -0,0 +1,154 @@ +import { FeatureFlag } from "../feature-flags" +import { JobHandler } from "./handlers/job-handler" +import { StepHandler } from "./handlers/step-handler" +import { SubscriberHandler } from "./handlers/subscriber-handler" +import { WorkflowHandler } from "./handlers/workflow-handler" +import { + addToInverseRegistry, + addToRegistry, + getOrCreateRegistry, +} from "./registry-helpers" +import { + BaseResourceData, + ResourceMap, + ResourcePath, + ResourceRegistrationData, + ResourceTypeHandler, +} from "./types" + +export type { + BaseResourceData, + ResourceEntry, + ResourceMap, + ResourcePath, + ResourceType, + ResourceTypeHandler, +} from "./types" + +/** + * Maps source file paths to their registered resources + * Structure: sourcePath -> Map + */ +export const globalDevServerRegistry = new Map() + +/** + * Inverse registry for looking up source paths by resource + * Structure: "type:id" -> sourcePath[] + * Used to find which files contain a specific resource + */ +export const inverseDevServerRegistry = new Map() + +/** + * Registry of resource type handlers + * Each handler implements the logic for a specific resource type + */ +const resourceHandlers = new Map() + +/** + * Register a resource type handler + * + * @example + * ```typescript + * class RouteHandler implements ResourceTypeHandler { + * readonly type = "route" + * validate(data: RouteData): void { ... } + * resolveSourcePath(data: RouteData): string { ... } + * createEntry(data: RouteData): ResourceEntry { ... } + * getInverseKey(data: RouteData): string { ... } + * } + * + * registerResourceTypeHandler(new RouteHandler()) + * ``` + */ +export function registerResourceTypeHandler( + handler: ResourceTypeHandler +): void { + if (resourceHandlers.has(handler.type)) { + console.warn( + `Resource type handler for "${handler.type}" is being overridden` + ) + } + + resourceHandlers.set(handler.type, handler) +} + +registerResourceTypeHandler(new WorkflowHandler()) +registerResourceTypeHandler(new StepHandler(inverseDevServerRegistry)) +registerResourceTypeHandler(new SubscriberHandler()) +registerResourceTypeHandler(new JobHandler()) + +/** + * Register a resource in the dev server for hot module reloading + * + * This function uses a strategy pattern where each resource type has its own handler. + * The handler is responsible for: + * - Validating the registration data + * - Resolving the source path + * - Creating the registry entry + * - Generating the inverse registry key + * + * @param data - Resource registration data + * @throws Error if validation fails or handler is not found + * + * @example + * ```typescript + * // Register a workflow + * registerDevServerResource({ + * type: "workflow", + * id: "create-product", + * sourcePath: "/src/workflows/create-product.ts" + * }) + * + * // Register a step + * registerDevServerResource({ + * type: "step", + * id: "validate-product", + * workflowId: "create-product" + * }) + * ``` + */ +export function registerDevServerResource(data: ResourceRegistrationData): void +export function registerDevServerResource( + data: T +): void +export function registerDevServerResource( + data: T +): void { + // Skip registration in production or if HMR is disabled + const isProduction = ["production", "prod"].includes( + process.env.NODE_ENV || "" + ) + + if (!FeatureFlag.isFeatureEnabled("backend_hmr") || isProduction) { + return + } + + const handler = resourceHandlers.get(data.type) + + if (!handler) { + throw new Error( + `No handler registered for resource type "${data.type}". ` + + `Available types: ${Array.from(resourceHandlers.keys()).join(", ")}. ` + + `Use registerResourceTypeHandler() to add support for custom types.` + ) + } + + try { + handler.validate(data) + + const sourcePath = handler.resolveSourcePath(data) + + const registry = getOrCreateRegistry(globalDevServerRegistry, sourcePath) + + const entry = handler.createEntry(data) + addToRegistry(registry, data.type, entry) + + const inverseKey = handler.getInverseKey(data) + addToInverseRegistry(inverseDevServerRegistry, inverseKey, sourcePath) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + throw new Error( + `Failed to register ${data.type} resource "${data.id}": ${errorMessage}` + ) + } +} diff --git a/packages/core/utils/src/dev-server/registry-helpers.ts b/packages/core/utils/src/dev-server/registry-helpers.ts new file mode 100644 index 0000000000..5e62b9d3a7 --- /dev/null +++ b/packages/core/utils/src/dev-server/registry-helpers.ts @@ -0,0 +1,34 @@ +import { ResourceEntry, ResourceMap, ResourcePath } from "./types" + +export function getOrCreateRegistry( + globalRegistry: Map, + sourcePath: string +): ResourceMap { + let registry = globalRegistry.get(sourcePath) + + if (!registry) { + registry = new Map() + globalRegistry.set(sourcePath, registry) + } + + return registry +} + +export function addToRegistry( + registry: ResourceMap, + type: string, + entry: ResourceEntry +): void { + const entries = registry.get(type) || [] + registry.set(type, [...entries, entry]) +} + +export function addToInverseRegistry( + inverseRegistry: Map, + key: string, + sourcePath: string +): void { + const existing = inverseRegistry.get(key) || [] + const updated = Array.from(new Set([...existing, sourcePath])) + inverseRegistry.set(key, updated) +} diff --git a/packages/core/utils/src/dev-server/types.ts b/packages/core/utils/src/dev-server/types.ts new file mode 100644 index 0000000000..db2ebaa760 --- /dev/null +++ b/packages/core/utils/src/dev-server/types.ts @@ -0,0 +1,59 @@ +export type ResourcePath = string +export type ResourceType = string +export type ResourceEntry = { + id: string + workflowId?: string + [key: string]: any +} +export type ResourceMap = Map + +export interface BaseResourceData { + type: string + id: string + sourcePath?: string +} + +export interface WorkflowResourceData extends BaseResourceData { + type: "workflow" + sourcePath: string +} + +export interface StepResourceData extends BaseResourceData { + type: "step" + workflowId?: string + sourcePath?: string +} + +export interface SubscriberResourceData extends BaseResourceData { + type: "subscriber" + sourcePath: string + subscriberId: string + events: string[] +} + +export interface JobResourceData extends BaseResourceData { + type: "job" + sourcePath: string + config: { + name: string + } +} + +export type ResourceRegistrationData = + | WorkflowResourceData + | StepResourceData + | SubscriberResourceData + +export interface ResourceTypeHandler< + T extends BaseResourceData = BaseResourceData +> { + readonly type: string + + validate(data: T): void + + resolveSourcePath(data: T): string + + createEntry(data: T): ResourceEntry + + getInverseKey(data: T): string +} diff --git a/packages/core/utils/src/index.ts b/packages/core/utils/src/index.ts index e53e8b53be..0f5a72e759 100644 --- a/packages/core/utils/src/index.ts +++ b/packages/core/utils/src/index.ts @@ -30,6 +30,7 @@ export * from "./totals" export * from "./totals/big-number" export * from "./user" export * from "./caching" +export * from "./dev-server" export const MedusaModuleType = Symbol.for("MedusaModule") export const MedusaModuleProviderType = Symbol.for("MedusaModuleProvider") diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 8e43f79ba1..e80809051c 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -4,7 +4,13 @@ import { WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" -import { isDefined, isString, OrchestrationUtils } from "@medusajs/utils" +import { + getCallerFilePath, + isDefined, + isString, + OrchestrationUtils, + registerDevServerResource, +} from "@medusajs/utils" import { ulid } from "ulid" import { resolveValue, StepResponse } from "./helpers" import { createStepHandler } from "./helpers/create-step-handler" @@ -159,6 +165,12 @@ export function applyStep< ) } + registerDevServerResource({ + id: stepName, + type: "step", + workflowId: this.workflowId!, + }) + const handler = createAndConfigureHandler( this, stepName, @@ -490,5 +502,12 @@ export function createStep< returnFn.__type = OrchestrationUtils.SymbolWorkflowStepBind returnFn.__step__ = stepName + const sourcePath = getCallerFilePath() as string + registerDevServerResource({ + id: stepName, + type: "step", + sourcePath, + }) + return returnFn } diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 261c6815d8..cb0b489844 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -13,6 +13,7 @@ import { isString, Modules, OrchestrationUtils, + registerDevServerResource, } from "@medusajs/utils" import { ulid } from "ulid" import { exportWorkflow, WorkflowResult } from "../../helper" @@ -116,6 +117,12 @@ export function createWorkflow( const name = isString(nameOrConfig) ? nameOrConfig : nameOrConfig.name const options = isString(nameOrConfig) ? {} : nameOrConfig + registerDevServerResource({ + sourcePath: fileSourcePath, + id: name, + type: "workflow", + }) + const handlers: WorkflowHandler = new Map() let newWorkflow = false diff --git a/packages/medusa/package.json b/packages/medusa/package.json index 9db46729d3..d15f0cbfbb 100644 --- a/packages/medusa/package.json +++ b/packages/medusa/package.json @@ -106,7 +106,7 @@ "@medusajs/workflow-engine-redis": "2.12.1", "boxen": "^5.0.1", "chalk": "^4.1.2", - "chokidar": "^3.5.3", + "chokidar": "^4.0.3", "compression": "^1.8.0", "express": "^4.21.0", "fs-exists-cached": "^1.0.0", diff --git a/packages/medusa/src/commands/develop.ts b/packages/medusa/src/commands/develop.ts index 6425b6e4f8..abc031eb43 100644 --- a/packages/medusa/src/commands/develop.ts +++ b/packages/medusa/src/commands/develop.ts @@ -1,11 +1,15 @@ import { MEDUSA_CLI_PATH } from "@medusajs/framework" -import { ContainerRegistrationKeys } from "@medusajs/framework/utils" +import { + ContainerRegistrationKeys, + FeatureFlag, +} from "@medusajs/framework/utils" import { Store } from "@medusajs/telemetry" import boxen from "boxen" import { ChildProcess, execSync, fork } from "child_process" import chokidar, { FSWatcher } from "chokidar" import { EOL } from "os" import path from "path" +import BackendHmrFeatureFlag from "../feature-flags/backend-hmr" import { initializeContainer } from "../loaders" const defaultConfig = { @@ -18,6 +22,23 @@ export default async function ({ types, directory }) { const container = await initializeContainer(directory) const logger = container.resolve(ContainerRegistrationKeys.LOGGER) + const isBackendHmrEnabled = FeatureFlag.isFeatureEnabled( + BackendHmrFeatureFlag.key + ) + + const reloadActionVerb = isBackendHmrEnabled ? "reloading" : "restarting" + const logSource = isBackendHmrEnabled ? "[HMR]" : "[Watcher]" + + if (isBackendHmrEnabled) { + logger.info( + `${logSource} Using backend HMR dev server (reload on file change)` + ) + } else { + logger.info( + `${logSource} Using standard dev server (restart on file change)` + ) + } + const args = process.argv const argv = @@ -53,19 +74,66 @@ export default async function ({ types, directory }) { * restart the dev server instead of asking the user to re-run * the command. */ - start() { - this.childProcess = fork(cliPath, ["start", ...args], { + async start() { + const forkOptions: any = { cwd: directory, env: { ...process.env, NODE_ENV: "development", + ...(isBackendHmrEnabled && { MEDUSA_HMR_ENABLED: "true" }), }, execArgv: argv, - }) + } + + // Enable IPC for HMR mode to communicate reload requests + if (isBackendHmrEnabled) { + forkOptions.stdio = ["inherit", "inherit", "inherit", "ipc"] + } + + this.childProcess = fork(cliPath, ["start", ...args], forkOptions) + this.childProcess.on("error", (error) => { // @ts-ignore - logger.error("Dev server failed to start", error) - logger.info("The server will restart automatically after your changes") + logger.error(`${logSource} Dev server failed to start`, error) + logger.info( + `${logSource} The server will restart automatically after your changes` + ) + }) + }, + + /** + * Sends an HMR reload request to the child process and waits for result. + * Returns true if reload succeeded, false if it failed. + */ + async sendHmrReload( + action: "add" | "change" | "unlink", + file: string + ): Promise { + return new Promise((resolve) => { + if (!this.childProcess) { + resolve(false) + return + } + + const timeout = setTimeout(() => { + resolve(false) + }, 30000) // 30s timeout + + const messageHandler = (msg: any) => { + if (msg?.type === "hmr-result") { + clearTimeout(timeout) + this.childProcess?.off("message", messageHandler) + resolve(msg.success === true) + } + } + + this.childProcess.on("message", messageHandler) + this.childProcess.send({ + type: "hmr-reload", + action, + file: path.resolve(directory, file), + rootDirectory: directory, + }) }) }, @@ -73,7 +141,18 @@ export default async function ({ types, directory }) { * Restarts the development server by cleaning up the existing * child process and forking a new one */ - restart() { + async restart(action: "add" | "change" | "unlink", file: string) { + if (isBackendHmrEnabled && this.childProcess) { + const success = await this.sendHmrReload(action, file) + + if (success) { + return + } + + // HMR reload failed, kill the process and restart + logger.info(`${logSource} HMR reload failed, restarting server...`) + } + if (this.childProcess) { this.childProcess.removeAllListeners() if (process.platform === "win32") { @@ -82,7 +161,7 @@ export default async function ({ types, directory }) { this.childProcess.kill("SIGINT") } } - this.start() + await this.start() }, /** @@ -94,9 +173,9 @@ export default async function ({ types, directory }) { * - src/admin/** */ watch() { - this.watcher = chokidar.watch(["."], { + this.watcher = chokidar.watch(".", { ignoreInitial: true, - cwd: process.cwd(), + cwd: directory, ignored: [ /(^|[\\/\\])\../, "node_modules", @@ -108,27 +187,46 @@ export default async function ({ types, directory }) { ], }) - this.watcher.on("add", (file) => { + async function handleFileChange( + this: typeof devServer, + action: "add" | "change" | "unlink", + file: string + ) { + const actionVerb = + action === "add" + ? "created" + : action === "change" + ? "modified" + : "removed" + + const now = performance.now() logger.info( - `${path.relative(directory, file)} created: Restarting dev server` + `${logSource} ${actionVerb} ${path.relative( + directory, + file + )} ${actionVerb}: ${reloadActionVerb} dev server` ) - this.restart() + + await this.restart(action, file) + + const duration = performance.now() - now + logger.info(`${logSource} Reloaded in ${duration.toFixed(2)}ms`) + } + + this.watcher.on("add", async (file) => { + handleFileChange.call(this, "add", file) }) - this.watcher.on("change", (file) => { - logger.info( - `${path.relative(directory, file)} modified: Restarting dev server` - ) - this.restart() + this.watcher.on("change", async (file) => { + handleFileChange.call(this, "change", file) }) - this.watcher.on("unlink", (file) => { - logger.info( - `${path.relative(directory, file)} removed: Restarting dev server` - ) - this.restart() + this.watcher.on("unlink", async (file) => { + handleFileChange.call(this, "unlink", file) }) this.watcher.on("ready", function () { - logger.info(`Watching filesystem to reload dev server on file change`) + logger.info( + `${logSource} Watching filesystem to reload dev server on file change` + ) }) }, } @@ -152,6 +250,6 @@ export default async function ({ types, directory }) { process.exit(0) }) - devServer.start() + await devServer.start() devServer.watch() } diff --git a/packages/medusa/src/commands/start.ts b/packages/medusa/src/commands/start.ts index e01d7f98df..77ff230a24 100644 --- a/packages/medusa/src/commands/start.ts +++ b/packages/medusa/src/commands/start.ts @@ -18,9 +18,11 @@ import { } from "@medusajs/framework/utils" import { MedusaModule } from "@medusajs/framework/modules-sdk" -import { MedusaContainer } from "@medusajs/framework/types" +import { Logger, MedusaContainer } from "@medusajs/framework/types" import { parse } from "url" import loaders, { initializeContainer } from "../loaders" +import { reloadResources } from "./utils/dev-server" +import { HMRReloadError } from "./utils/dev-server/errors" const EVERY_SIXTH_HOUR = "0 */6 * * *" const CRON_SCHEDULE = EVERY_SIXTH_HOUR @@ -163,6 +165,37 @@ function findExpressRoutePath({ return undefined } +function handleHMRReload(logger: Logger) { + // Set up HMR reload handler if running in HMR mode + if (process.env.MEDUSA_HMR_ENABLED === "true" && process.send) { + ;(global as any).__MEDUSA_HMR_ROUTE_REGISTRY__ = true + + process.on("message", async (msg: any) => { + if (msg?.type === "hmr-reload") { + const { action, file, rootDirectory } = msg + + const success = await reloadResources({ + logSource: "[HMR]", + action, + absoluteFilePath: file, + logger, + rootDirectory, + }) + .then(() => true) + .catch((error) => { + if (HMRReloadError.isHMRReloadError(error)) { + return false + } + logger.error("[HMR] Reload failed with unexpected error", error) + return false + }) + + process.send!({ type: "hmr-result", success }) + } + }) + } +} + async function start(args: { directory: string host?: string @@ -171,7 +204,10 @@ async function start(args: { cluster?: string workers?: string servers?: string -}) { +}): Promise<{ + server: GracefulShutdownServer + gracefulShutDown: () => void +} | void> { const { port = 9000, host, @@ -296,7 +332,9 @@ async function start(args: { track("PING") }) - return { server } + handleHMRReload(logger) + + return { server, gracefulShutDown } } catch (err) { logger.error("Error starting server", err) process.exit(1) @@ -357,14 +395,14 @@ async function start(args: { process.env.PLUGIN_ADMIN_UI_SKIP_CACHE = "true" } - await internalStart(!!types && msg.index === 0) + return await internalStart(!!types && msg.index === 0) }) } } else { /** * Not in cluster mode */ - await internalStart(!!types) + return await internalStart(!!types) } } diff --git a/packages/medusa/src/commands/utils/dev-server/errors.ts b/packages/medusa/src/commands/utils/dev-server/errors.ts new file mode 100644 index 0000000000..3b8a951bc1 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/errors.ts @@ -0,0 +1,10 @@ +export class HMRReloadError extends Error { + static isHMRReloadError(error: Error): error is HMRReloadError { + return error instanceof HMRReloadError || error.name === "HMRReloadError" + } + + constructor(message: string) { + super(message) + this.name = "HMRReloadError" + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/index.ts b/packages/medusa/src/commands/utils/dev-server/index.ts new file mode 100644 index 0000000000..0b568d49a0 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/index.ts @@ -0,0 +1,137 @@ +import { container } from "@medusajs/framework" +import { logger } from "@medusajs/framework/logger" +import { ModuleCacheManager } from "./module-cache-manager" +import { RecoveryService } from "./recovery-service" +import { RouteReloader } from "./reloaders/routes" +import { SubscriberReloader } from "./reloaders/subscribers" +import { WorkflowReloader } from "./reloaders/workflows" +import { ResourceRegistry } from "./resource-registry" +import { DevServerGlobals, ReloadParams } from "./types" +import { JobReloader } from "./reloaders/jobs" +import { ModuleReloader } from "./reloaders/modules" +import { HMRReloadError } from "./errors" + +let sharedCacheManager!: ModuleCacheManager +const sharedRegistry = new ResourceRegistry() + +const reloaders = {} as { + routesReloader: RouteReloader + subscribersReloader?: SubscriberReloader + workflowsReloader: WorkflowReloader + jobsReloader?: JobReloader + modulesReloader?: ModuleReloader +} + +function initializeReloaders(logSource: string, rootDirectory: string) { + sharedCacheManager ??= new ModuleCacheManager(logSource) + + const globals = global as unknown as DevServerGlobals + + if (!reloaders.routesReloader) { + const routeReloader = new RouteReloader( + globals.__MEDUSA_HMR_API_LOADER__, + sharedCacheManager, + logSource, + logger + ) + reloaders.routesReloader = routeReloader + } + + if (!reloaders.subscribersReloader) { + const subscriberReloader = new SubscriberReloader( + container, + sharedCacheManager, + sharedRegistry, + logSource, + logger + ) + reloaders.subscribersReloader = subscriberReloader + } + + if (!reloaders.workflowsReloader) { + const workflowReloader = new WorkflowReloader( + globals.WorkflowManager, + sharedCacheManager, + sharedRegistry, + reloadResources, + logSource, + logger, + rootDirectory + ) + reloaders.workflowsReloader = workflowReloader + } + + if (!reloaders.jobsReloader) { + const jobReloader = new JobReloader( + globals.WorkflowManager, + sharedCacheManager, + container, + sharedRegistry, + logSource, + logger + ) + reloaders.jobsReloader = jobReloader + } + + if (!reloaders.modulesReloader) { + const moduleReloader = new ModuleReloader( + sharedCacheManager, + rootDirectory, + logSource, + logger + ) + reloaders.modulesReloader = moduleReloader + } +} + +const unmanagedFiles = ["medusa-config", ".env"] + +/** + * Main entry point for reloading resources (routes, subscribers, workflows, and modules) + * Orchestrates the reload process and handles recovery of broken modules + */ +export async function reloadResources({ + logSource, + action, + absoluteFilePath, + keepCache, + logger, + skipRecovery = false, + rootDirectory, +}: ReloadParams): Promise { + if (unmanagedFiles.some((file) => absoluteFilePath.includes(file))) { + throw new HMRReloadError( + `File ${absoluteFilePath} is not managed by the dev server HMR. Server restart may be required.` + ) + } + + initializeReloaders(logSource, rootDirectory) + + // Reload modules first as other resources might depend on them + await reloaders.modulesReloader?.reload?.(action, absoluteFilePath) + + // Reload in dependency order: workflows → routes → subscribers → jobs + // Jobs depend on workflows, so workflows must be reloaded first + await reloaders.workflowsReloader.reload( + action, + absoluteFilePath, + keepCache, + skipRecovery + ) + await reloaders.routesReloader.reload(action, absoluteFilePath) + await reloaders.subscribersReloader?.reload?.(action, absoluteFilePath) + await reloaders.jobsReloader?.reload?.(action, absoluteFilePath) + + // Attempt recovery of broken modules (unless we're already in recovery mode) + if (!skipRecovery) { + const recoveryService = new RecoveryService( + sharedCacheManager, + reloadResources, + logSource, + logger, + rootDirectory + ) + + await recoveryService.recoverBrokenModules() + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/module-cache-manager.ts b/packages/medusa/src/commands/utils/dev-server/module-cache-manager.ts new file mode 100644 index 0000000000..513e36454d --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/module-cache-manager.ts @@ -0,0 +1,204 @@ +import { Logger } from "@medusajs/framework/types" +import path from "path" +import { CONFIG } from "./types" + +/** + * Manages Node.js require cache operations and tracks broken modules + */ +export class ModuleCacheManager { + constructor(private readonly logSource: string) {} + + private brokenModules: Set = new Set() + + /** + * Check if a module path should be excluded from cache operations + */ + private shouldExcludePath(modulePath: string): boolean { + return CONFIG.EXCLUDED_PATH_PATTERNS.some((pattern) => + modulePath.includes(pattern) + ) + } + + /** + * Clear cache for descendant modules recursively + */ + private async clearDescendantModules( + modulePath: string, + visitedModules: Set, + logger?: Logger, + onClear?: (path: string) => Promise + ): Promise { + if (this.shouldExcludePath(modulePath) || visitedModules.has(modulePath)) { + return + } + + visitedModules.add(modulePath) + + const moduleEntry = require.cache[modulePath] + if (!moduleEntry) { + return + } + + // Recursively clear children first + if (moduleEntry.children) { + for (const child of moduleEntry.children) { + await this.clearDescendantModules( + child.id, + visitedModules, + logger, + onClear + ) + } + } + + delete require.cache[modulePath] + + if (onClear) { + await onClear(modulePath) + } + + this.logCacheClear(modulePath, logger, "Cleared cache") + } + + /** + * Clear cache for parent modules recursively + */ + private async clearParentModules( + targetPath: string, + visitedModules: Set, + logger?: Logger, + onClear?: (path: string) => Promise, + trackBroken: boolean = true + ): Promise { + const parentsToCheck = this.findParentModules(targetPath) + + for (const modulePath of parentsToCheck) { + if (visitedModules.has(modulePath)) { + continue + } + + visitedModules.add(modulePath) + + // Recursively clear parents first + await this.clearParentModules( + modulePath, + visitedModules, + logger, + onClear, + trackBroken + ) + + // Track as potentially broken before deletion + if (trackBroken) { + this.brokenModules.add(modulePath) + } + + delete require.cache[modulePath] + + if (onClear) { + await onClear(modulePath) + } + + this.logCacheClear(modulePath, logger, "Cleared parent cache") + } + } + + /** + * Find all parent modules that depend on the target path + */ + private findParentModules(targetPath: string): Set { + const parents = new Set() + + for (const [modulePath, moduleEntry] of Object.entries(require.cache)) { + if (this.shouldExcludePath(modulePath)) { + continue + } + + if (moduleEntry?.children?.some((child) => child.id === targetPath)) { + parents.add(modulePath) + } + } + + return parents + } + + /** + * Log cache clearing operation + */ + private logCacheClear( + modulePath: string, + logger: Logger | undefined, + message: string + ): void { + if (logger) { + const relativePath = path.relative(process.cwd(), modulePath) + logger.debug(`${this.logSource} ${message}: ${relativePath}`) + } + } + + /** + * Clear require cache for a file and all its parent/descendant modules + */ + async clear( + filePath: string, + logger?: Logger, + onClear?: (modulePath: string) => Promise, + trackBroken: boolean = true + ): Promise { + const absolutePath = path.resolve(filePath) + const visitedModules = new Set() + + // Clear parents first, then descendants + await this.clearParentModules( + absolutePath, + visitedModules, + logger, + onClear, + trackBroken + ) + + await this.clearDescendantModules( + absolutePath, + visitedModules, + logger, + onClear + ) + + if (logger) { + const relativePath = path.relative(process.cwd(), filePath) + logger.info( + `${this.logSource} Cleared ${visitedModules.size} module(s) from cache for ${relativePath}` + ) + } + + return visitedModules.size + } + + /** + * Remove a module from the broken modules set + */ + removeBrokenModule(modulePath: string): void { + this.brokenModules.delete(modulePath) + } + + /** + * Get all broken module paths + */ + getBrokenModules(): string[] { + return Array.from(this.brokenModules) + } + + /** + * Get the count of broken modules + */ + getBrokenModuleCount(): number { + return this.brokenModules.size + } + + /** + * Clear a specific module from require cache + */ + clearSingleModule(modulePath: string): void { + delete require.cache[modulePath] + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/recovery-service.ts b/packages/medusa/src/commands/utils/dev-server/recovery-service.ts new file mode 100644 index 0000000000..3ce14e43a0 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/recovery-service.ts @@ -0,0 +1,88 @@ +import { Logger } from "@medusajs/framework/types" +import { ModuleCacheManager } from "./module-cache-manager" +import path from "path" +import { ReloadParams } from "./types" + +/** + * Handles recovery of broken modules after dependencies are restored + */ +export class RecoveryService { + constructor( + private cacheManager: ModuleCacheManager, + private reloadResources: (params: ReloadParams) => Promise, + private logSource: string, + private logger: Logger, + private rootDirectory: string + ) {} + + /** + * Attempt to recover all broken modules + */ + async recoverBrokenModules(): Promise { + const brokenCount = this.cacheManager.getBrokenModuleCount() + if (!brokenCount) { + return + } + + this.logger.info( + `${this.logSource} Attempting to recover ${brokenCount} broken module(s)` + ) + + const brokenModules = this.cacheManager.getBrokenModules() + + for (const modulePath of brokenModules) { + await this.attemptModuleRecovery(modulePath) + } + + this.logRecoveryResults() + } + + /** + * Attempt to recover a single broken module + */ + private async attemptModuleRecovery(modulePath: string): Promise { + this.cacheManager.clearSingleModule(modulePath) + + const relativePath = path.relative(process.cwd(), modulePath) + this.logger.info(`${this.logSource} Attempting to reload: ${relativePath}`) + + try { + // Attempt reload with skipRecovery=true to prevent infinite recursion + await this.reloadResources({ + logSource: this.logSource, + action: "change", + absoluteFilePath: modulePath, + keepCache: false, + logger: this.logger, + skipRecovery: true, + rootDirectory: this.rootDirectory, + }) + + this.cacheManager.removeBrokenModule(modulePath) + this.logger.info( + `${this.logSource} Successfully recovered: ${relativePath}` + ) + } catch (error) { + this.logger.debug( + `${this.logSource} Could not recover ${relativePath}: ${error}` + ) + } + } + + /** + * Log final recovery results + */ + private logRecoveryResults(): void { + const remainingBroken = this.cacheManager.getBrokenModuleCount() + + if (remainingBroken) { + this.logger.debug( + `${this.logSource} ${remainingBroken} module(s) remain broken. They may recover when additional dependencies are restored.` + ) + } else { + this.logger.info( + `${this.logSource} All broken modules successfully recovered` + ) + } + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/reloaders/base.ts b/packages/medusa/src/commands/utils/dev-server/reloaders/base.ts new file mode 100644 index 0000000000..0d765590f6 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/reloaders/base.ts @@ -0,0 +1,54 @@ +import { Logger } from "@medusajs/framework/types" +import { ModuleCacheManager } from "../module-cache-manager" +import { FileChangeAction } from "../types" + +export class BaseReloader { + constructor( + private readonly cacheManager: ModuleCacheManager, + private readonly logSource: string, + private readonly logger: Logger + ) {} + + clearModuleCache(absoluteFilePath: string) { + const resolved = require.resolve(absoluteFilePath) + if (require.cache[resolved]) { + delete require.cache[resolved] + } + } + + async clearParentChildModulesCache( + absoluteFilePath: string, + reloaders: Array<() => Promise>, + reloadResources: (args: { + logSource: string + action: FileChangeAction + absoluteFilePath: string + keepCache: boolean + skipRecovery: boolean + logger: Logger + rootDirectory: string + }) => Promise, + skipRecovery: boolean, + rootDirectory: string + ): Promise { + await this.cacheManager.clear( + absoluteFilePath, + this.logger, + async (modulePath) => { + // Create deferred reloader for each cleared module + reloaders.push(async () => + reloadResources({ + logSource: this.logSource, + action: "change", + absoluteFilePath: modulePath, + keepCache: true, + skipRecovery: true, // handled by the main caller + logger: this.logger, + rootDirectory, + }) + ) + }, + !skipRecovery // Track broken modules unless we're in recovery mode + ) + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/reloaders/jobs.ts b/packages/medusa/src/commands/utils/dev-server/reloaders/jobs.ts new file mode 100644 index 0000000000..2a4c79990d --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/reloaders/jobs.ts @@ -0,0 +1,85 @@ +import { JobLoader } from "@medusajs/framework/jobs" +import { Logger, MedusaContainer } from "@medusajs/framework/types" +import { ModuleCacheManager } from "../module-cache-manager" +import { ResourceRegistry } from "../resource-registry" +import { CONFIG, DevServerGlobals, FileChangeAction } from "../types" +import { BaseReloader } from "./base" + +/** + * Metadata for a registered subscriber + */ +interface JobMetadata { + name: string + [key: string]: any +} + +export class JobReloader extends BaseReloader { + #logSource: string + #logger: Logger + + constructor( + private workflowManager: DevServerGlobals["WorkflowManager"], + cacheManager: ModuleCacheManager, + private container: MedusaContainer, + private registry: ResourceRegistry, + logSource: string, + logger: Logger + ) { + super(cacheManager, logSource, logger) + this.#logSource = logSource + this.#logger = logger + } + + /** + * Check if a file path represents a subscriber + */ + private isJobPath(filePath: string): boolean { + return filePath.includes(CONFIG.RESOURCE_PATH_PATTERNS.job) + } + + /** + * Unregister a subscriber from the event-bus + */ + private unregisterJob(metadata: JobMetadata): void { + this.workflowManager?.unregister(metadata.name) + this.#logger.debug(`${this.#logSource} Unregistered job ${metadata.name}`) + } + + /** + * Register a subscriber by loading the file and extracting its metadata + */ + private async registerJob(absoluteFilePath: string) { + const jobLoader = new JobLoader([], this.container) + await jobLoader.loadFile(absoluteFilePath) + this.#logger.debug(`${this.#logSource} Registered job ${absoluteFilePath}`) + } + + /** + * Reload a subscriber file if necessary + */ + async reload( + action: FileChangeAction, + absoluteFilePath: string + ): Promise { + if (!this.isJobPath(absoluteFilePath)) { + return + } + + const existingResources = this.registry.getResources(absoluteFilePath) + if (existingResources) { + for (const [_, resources] of existingResources) { + for (const resource of resources) { + this.unregisterJob({ + name: resource.id, + config: resource.config, + }) + } + } + } + + if (action === "add" || action === "change") { + this.clearModuleCache(absoluteFilePath) + await this.registerJob(absoluteFilePath) + } + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/reloaders/modules.ts b/packages/medusa/src/commands/utils/dev-server/reloaders/modules.ts new file mode 100644 index 0000000000..7f24844361 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/reloaders/modules.ts @@ -0,0 +1,265 @@ +import { container, MedusaAppLoader } from "@medusajs/framework" +import { IModuleService, Logger } from "@medusajs/framework/types" +import { + ContainerRegistrationKeys, + dynamicImport, +} from "@medusajs/framework/utils" +import { join, relative } from "path" +import { ModuleCacheManager } from "../module-cache-manager" +import { CONFIG, FileChangeAction } from "../types" +import { BaseReloader } from "./base" +import { HMRReloadError } from "../errors" + +/** + * Handles hot reloading of custom modules in the /modules directory + */ +export class ModuleReloader extends BaseReloader { + #logSource: string + #logger: Logger + #rootDirectory: string + + constructor( + cacheManager: ModuleCacheManager, + rootDirectory: string, + logSource: string, + logger: Logger + ) { + super(cacheManager, logSource, logger) + this.#logSource = logSource + this.#logger = logger + this.#rootDirectory = rootDirectory + } + + /** + * Check if a file path is within a module directory + */ + private isModulePath(filePath: string): boolean { + return filePath.includes("modules/") + } + + /** + * Extract module name from file path + * e.g., "/path/to/project/modules/contact-us/service.ts" -> "contact-us" + */ + private getModuleNameFromPath(filePath: string): string | null { + const modulesPattern = "modules/" + const parts = filePath.split(modulesPattern) + + if (parts.length < 2) { + return null + } + + const afterModules = parts[1] + const moduleName = afterModules.split("/")[0] + + return moduleName || null + } + + /** + * Get the module directory path + */ + private getModuleDirectory(moduleName: string): string { + return join(this.#rootDirectory, "src", "modules", moduleName) + } + + /** + * Get module key and service name from config + */ + private async getModuleInfo(moduleName: string): Promise<{ + moduleKey: string + serviceName: string + } | null> { + try { + const configModule = container.resolve( + ContainerRegistrationKeys.CONFIG_MODULE + ) + + if (!configModule?.modules) { + return null + } + + // Find the module in config + for (const [key, config] of Object.entries(configModule.modules)) { + if (typeof config === "object" && config !== null) { + const resolvedPath = (config as any).resolve + if ( + resolvedPath && + (resolvedPath.includes(`/modules/${moduleName}`) || + resolvedPath === `./modules/${moduleName}`) + ) { + // Load the module to get serviceName from joinerConfig + const moduleDirectory = this.getModuleDirectory(moduleName) + const moduleIndexPath = join(moduleDirectory, "index.ts") + const moduleExports = await dynamicImport(moduleIndexPath) + const moduleService = + moduleExports.service ?? moduleExports.default?.service + + const joinerConfig = + typeof moduleService?.prototype?.__joinerConfig === "function" + ? moduleService.prototype.__joinerConfig() + : moduleService?.prototype?.__joinerConfig + + if (!joinerConfig?.serviceName) { + return null + } + + return { + moduleKey: key, + serviceName: joinerConfig.serviceName, + } + } + } + } + + return null + } catch (error: any) { + this.#logger.warn( + `${this.#logSource} Failed to get module info for "${moduleName}": ${ + error.message + }` + ) + return null + } + } + + /** + * Shutdown a module instance by calling its lifecycle hooks + */ + private async shutdownModule(moduleInstance: any): Promise { + try { + if (moduleInstance?.__hooks?.onApplicationPrepareShutdown) { + await moduleInstance.__hooks.onApplicationPrepareShutdown + .bind(moduleInstance)() + .catch(() => {}) + } + + if (moduleInstance?.__hooks?.onApplicationShutdown) { + await moduleInstance.__hooks.onApplicationShutdown + .bind(moduleInstance)() + .catch(() => {}) + } + } catch (error) { + this.#logger.warn( + `${this.#logSource} Error during module shutdown: ${error.message}` + ) + } + } + + /** + * Clear all module files from require cache + */ + private clearModuleFilesCache(moduleDirectory: string): void { + const relativeModuleDirectory = relative( + this.#rootDirectory, + moduleDirectory + ) + Object.keys(require.cache).forEach((cachedPath) => { + if ( + !CONFIG.EXCLUDED_PATH_PATTERNS.some((pattern) => + cachedPath.includes(pattern) + ) && + cachedPath.includes(relativeModuleDirectory) + ) { + delete require.cache[cachedPath] + } + }) + } + + /** + * Reload a module when its files change + */ + async reload( + action: FileChangeAction, + absoluteFilePath: string + ): Promise { + if (!this.isModulePath(absoluteFilePath)) { + return + } + + const moduleName = this.getModuleNameFromPath(absoluteFilePath) + if (!moduleName) { + this.#logger.warn( + `${ + this.#logSource + } Could not determine module name from path: ${absoluteFilePath}` + ) + return + } + + const relativePath = relative(this.#rootDirectory, absoluteFilePath) + + if (action === "unlink") { + this.#logger.warn( + `${ + this.#logSource + } Module file removed: ${relativePath}. Server restart may be required.` + ) + throw new HMRReloadError( + `Module file removed: ${relativePath}. Server restart may be required.` + ) + } + + if (absoluteFilePath.includes("migrations")) { + this.#logger.warn( + `${ + this.#logSource + } Migrations file changed: ${relativePath}. You may need to apply migrations and restart the server.` + ) + + return + } + + // Get the module information + const moduleInfo = await this.getModuleInfo(moduleName) + if (!moduleInfo) { + this.#logger.warn( + `${this.#logSource} Could not find module config for: ${moduleName}` + ) + return + } + + const { moduleKey, serviceName } = moduleInfo + + this.#logger.info( + `${ + this.#logSource + } Reloading module "${serviceName}" (${moduleName}) due to change in ${relativePath}` + ) + + try { + // Get the current module instance + const moduleInstance = container.resolve(serviceName) as any + + if (moduleInstance) { + // Shutdown the module + await this.shutdownModule(moduleInstance) + } + + const moduleDirectory = this.getModuleDirectory(moduleName) + this.clearModuleFilesCache(moduleDirectory) + + const medusaAppLoader = new MedusaAppLoader() + const newModuleInstance = (await medusaAppLoader.reloadSingleModule({ + moduleKey, + serviceName, + })) as unknown as IModuleService + + if (!newModuleInstance) { + throw new Error(`Failed to reload module "${moduleKey}"`) + } + + this.#logger.info( + `${this.#logSource} Successfully reloaded module "${serviceName}"` + ) + } catch (error: any) { + this.#logger.error( + `${this.#logSource} Failed to reload module "${serviceName}": ${ + error.message + }` + ) + throw new HMRReloadError( + `Failed to reload module "${serviceName}": ${error.message}. Server restart may be required.` + ) + } + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/reloaders/routes.ts b/packages/medusa/src/commands/utils/dev-server/reloaders/routes.ts new file mode 100644 index 0000000000..a9e017f1f6 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/reloaders/routes.ts @@ -0,0 +1,69 @@ +import { ApiLoader } from "@medusajs/framework" +import { Logger } from "@medusajs/framework/types" +import { CONFIG, FileChangeAction } from "../types" +import { ModuleCacheManager } from "../module-cache-manager" +import { BaseReloader } from "./base" + +/** + * Handles hot reloading of API resources (routes, middlewares, validators, etc.) + */ +export class RouteReloader extends BaseReloader { + #cacheManager: ModuleCacheManager + #logSource: string + #logger: Logger + + constructor( + private apiLoader: ApiLoader | undefined, + cacheManager: ModuleCacheManager, + logSource: string, + logger: Logger + ) { + super(cacheManager, logSource, logger) + this.#cacheManager = cacheManager + this.#logSource = logSource + this.#logger = logger + } + + /** + * Check if a file path is in the API directory + */ + private isApiPath(filePath: string): boolean { + return filePath.includes(CONFIG.RESOURCE_PATH_PATTERNS.route) + } + + /** + * Reload ALL API resources when any API file changes + * This clears all Express routes/middleware and reloads everything from scratch + */ + async reload( + _action: FileChangeAction, + absoluteFilePath: string + ): Promise { + if (!this.isApiPath(absoluteFilePath)) { + return + } + + if (!this.apiLoader) { + this.#logger.error( + `${this.#logSource} ApiLoader not available - cannot reload API` + ) + return + } + + this.#logger.info( + `${this.#logSource} API change detected: ${absoluteFilePath}` + ) + + await this.#cacheManager.clear( + absoluteFilePath, + this.#logger, + undefined, + false // Don't track as broken since we're intentionally reloading + ) + + this.apiLoader.clearAllResources() + + await this.apiLoader.load() + this.#logger.info(`${this.#logSource} API resources reloaded successfully`) + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/reloaders/subscribers.ts b/packages/medusa/src/commands/utils/dev-server/reloaders/subscribers.ts new file mode 100644 index 0000000000..f826b716df --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/reloaders/subscribers.ts @@ -0,0 +1,147 @@ +import { SubscriberLoader } from "@medusajs/framework/subscribers" +import { + IEventBusModuleService, + Logger, + MedusaContainer, +} from "@medusajs/framework/types" +import { Modules } from "@medusajs/framework/utils" +import { ResourceRegistry } from "../resource-registry" +import { CONFIG, FileChangeAction } from "../types" +import { BaseReloader } from "./base" +import { ModuleCacheManager } from "../module-cache-manager" + +/** + * Metadata for a registered subscriber + */ +interface SubscriberMetadata { + subscriberId: string + events: string[] +} + +/** + * Handles hot reloading of subscriber files with event-bus unregistration + */ +export class SubscriberReloader extends BaseReloader { + #eventBusService: IEventBusModuleService | undefined + #logSource: string + #logger: Logger + + constructor( + private container: MedusaContainer, + cacheManager: ModuleCacheManager, + private registry: ResourceRegistry, + logSource: string, + logger: Logger + ) { + super(cacheManager, logSource, logger) + this.#logSource = logSource + this.#logger = logger + this.#eventBusService = container.resolve(Modules.EVENT_BUS, { + allowUnregistered: true, + }) as IEventBusModuleService + } + + /** + * Check if a file path represents a subscriber + */ + private isSubscriberPath(filePath: string): boolean { + return filePath.includes(CONFIG.RESOURCE_PATH_PATTERNS.subscriber) + } + + /** + * Unregister a subscriber from the event-bus + */ + private unregisterSubscriber(metadata: SubscriberMetadata): void { + if (!this.#eventBusService) { + return + } + + for (const event of metadata.events) { + // Create a dummy subscriber function - the event-bus will use subscriberId to find the real one + const dummySubscriber = async () => {} + ;(dummySubscriber as any).subscriberId = metadata.subscriberId + + this.#eventBusService.unsubscribe(event, dummySubscriber as any, { + subscriberId: metadata.subscriberId, + }) + } + + this.#logger.debug( + `${this.#logSource} Unregistered subscriber ${ + metadata.subscriberId + } from events: ${metadata.events.join(", ")}` + ) + } + + /** + * Register a subscriber by loading the file and extracting its metadata + */ + private registerSubscriber(absoluteFilePath: string): void { + if (!this.#eventBusService) { + return + } + + try { + // Load the subscriber module + const subscriberModule = require(absoluteFilePath) + + new SubscriberLoader( + absoluteFilePath, + {}, + this.container + ).createSubscriber({ + fileName: absoluteFilePath, + config: subscriberModule.config, + handler: subscriberModule.default, + }) + + this.#logger.debug( + `${this.#logSource} Registered subscriber ${absoluteFilePath}` + ) + } catch (error) { + this.#logger.error( + `${ + this.#logSource + } Failed to register subscriber from ${absoluteFilePath}: ${error}` + ) + } + } + + /** + * Reload a subscriber file if necessary + */ + async reload( + action: FileChangeAction, + absoluteFilePath: string + ): Promise { + if (!this.isSubscriberPath(absoluteFilePath)) { + return + } + + if (!this.#eventBusService) { + this.#logger.error( + `${ + this.#logSource + } EventBusService not available - cannot reload subscribers` + ) + return + } + + const existingResources = this.registry.getResources(absoluteFilePath) + if (existingResources) { + for (const [_, resources] of existingResources) { + for (const resource of resources) { + this.unregisterSubscriber({ + subscriberId: resource.id, + events: resource.events, + }) + } + } + } + + if (action === "add" || action === "change") { + this.clearModuleCache(absoluteFilePath) + this.registerSubscriber(absoluteFilePath) + } + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/reloaders/workflows.ts b/packages/medusa/src/commands/utils/dev-server/reloaders/workflows.ts new file mode 100644 index 0000000000..7e616e99d6 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/reloaders/workflows.ts @@ -0,0 +1,178 @@ +import { Logger } from "@medusajs/framework/types" +import { ModuleCacheManager } from "../module-cache-manager" +import { ResourceRegistry } from "../resource-registry" +import { + CONFIG, + DevServerGlobals, + ReloadParams, + FileChangeAction, +} from "../types" +import { ResourceEntry, ResourceMap } from "@medusajs/framework/utils" +import { BaseReloader } from "./base" + +/** + * Handles hot reloading of workflow and step files + */ +export class WorkflowReloader extends BaseReloader { + #logSource: string + #logger: Logger + #rootDirectory: string + + constructor( + private workflowManager: DevServerGlobals["WorkflowManager"], + cacheManager: ModuleCacheManager, + private registry: ResourceRegistry, + private reloadResources: (params: ReloadParams) => Promise, + logSource: string, + logger: Logger, + rootDirectory: string + ) { + super(cacheManager, logSource, logger) + this.#logSource = logSource + this.#logger = logger + this.#rootDirectory = rootDirectory + } + + /** + * Check if a file path represents a workflow + */ + private isWorkflowPath(filePath: string): boolean { + return filePath.includes(CONFIG.RESOURCE_PATH_PATTERNS.workflow) + } + + /** + * Reload a workflow file if necessary + */ + async reload( + action: FileChangeAction, + absoluteFilePath: string, + keepCache: boolean = false, + skipRecovery: boolean = false + ): Promise { + if (!this.isWorkflowPath(absoluteFilePath)) { + return + } + + if (!this.workflowManager) { + this.#logger.error( + `${ + this.#logSource + } WorkflowManager not available - cannot reload workflows` + ) + return + } + + const requirableWorkflowPaths = new Set() + const reloaders: Array<() => Promise> = [] + + // Unregister resources and collect affected workflows + this.unregisterResources(absoluteFilePath, requirableWorkflowPaths) + + if (!keepCache) { + await this.clearParentChildModulesCache( + absoluteFilePath, + reloaders, + this.reloadResources, + skipRecovery, + this.#rootDirectory + ) + } + + this.clearModuleCache(absoluteFilePath) + + // Reload workflows that were affected + if (action !== "unlink") { + this.reloadWorkflowModules(requirableWorkflowPaths, absoluteFilePath) + } + + // Execute deferred reloaders + if (reloaders.length) { + await Promise.all(reloaders.map(async (reloader) => reloader())) + } + } + + /** + * Unregister workflow and step resources + */ + private unregisterResources( + absoluteFilePath: string, + affectedWorkflows: Set + ): void { + const resources = this.registry.getResources(absoluteFilePath) + if (!resources) { + return + } + + for (const [type, resourceList] of resources.entries()) { + for (const resource of resourceList) { + if (type === "workflow") { + this.workflowManager!.unregister(resource.id) + } else if (type === "step") { + this.handleStepUnregister(resource, affectedWorkflows) + } + } + } + } + + /** + * Handle unregistering a step and find affected workflows + */ + private handleStepUnregister( + stepResource: ResourceEntry, + affectedWorkflows: Set + ): void { + const workflowSourcePaths = this.registry.getWorkflowSourcePaths( + stepResource.id + ) + + if (!workflowSourcePaths) { + return + } + + for (const sourcePath of workflowSourcePaths) { + const workflowResources = this.registry.getResources(sourcePath) + if (!workflowResources) { + continue + } + + this.unregisterWorkflowsInResource( + workflowResources, + affectedWorkflows, + sourcePath + ) + } + } + + /** + * Unregister workflows found in a resource and track their paths + */ + private unregisterWorkflowsInResource( + workflowResources: ResourceMap, + affectedWorkflows: Set, + sourcePath: string + ): void { + for (const [type, resourceList] of workflowResources.entries()) { + if (type !== "workflow") { + continue + } + + for (const workflow of resourceList) { + this.workflowManager!.unregister(workflow.id) + affectedWorkflows.add(sourcePath) + } + } + } + + /** + * Reload workflow modules using require + */ + private reloadWorkflowModules( + workflowPaths: Set, + mainFilePath: string + ): void { + for (const workflowPath of workflowPaths) { + require(workflowPath) + } + require(mainFilePath) + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/resource-registry.ts b/packages/medusa/src/commands/utils/dev-server/resource-registry.ts new file mode 100644 index 0000000000..6fc81f7a02 --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/resource-registry.ts @@ -0,0 +1,22 @@ +import { + globalDevServerRegistry, + inverseDevServerRegistry, + ResourceMap, + ResourcePath, +} from "@medusajs/framework/utils" + +export class ResourceRegistry { + /** + * Get resources registered for a given file path + */ + getResources(filePath: string): ResourceMap | undefined { + return globalDevServerRegistry.get(filePath) + } + + /** + * Get workflow source paths for a step resource + */ + getWorkflowSourcePaths(stepId: string): ResourcePath[] | undefined { + return inverseDevServerRegistry.get(`step:${stepId}`) + } +} diff --git a/packages/medusa/src/commands/utils/dev-server/types.ts b/packages/medusa/src/commands/utils/dev-server/types.ts new file mode 100644 index 0000000000..7650065baa --- /dev/null +++ b/packages/medusa/src/commands/utils/dev-server/types.ts @@ -0,0 +1,56 @@ +import { ApiLoader } from "@medusajs/framework" +import { Logger } from "@medusajs/framework/types" + +/** + * Action types that can be performed on files + */ +export type FileChangeAction = "add" | "change" | "unlink" + +/** + * Configuration for path matching and exclusions + */ +export const CONFIG = { + EXCLUDED_PATH_PATTERNS: ["node_modules"], + RESOURCE_PATH_PATTERNS: { + route: "api/", + workflow: "workflows/", + subscriber: "subscribers/", + job: "jobs/", + module: "modules/", + }, +} as const + +/** + * Global dependencies available in the dev server environment + */ +export interface DevServerGlobals { + __MEDUSA_HMR_API_LOADER__?: ApiLoader + __MEDUSA_HMR_INITIAL_STACK_LENGTH__?: number + WorkflowManager?: { + unregister: (id: string) => void + } +} + +/** + * Parameters for resource reload operations + */ +export interface ReloadParams { + /** + * The source of the log, used to prefix the log messages + */ + logSource: string + action: FileChangeAction + absoluteFilePath: string + keepCache?: boolean + logger: Logger + skipRecovery?: boolean + rootDirectory: string +} + +/** + * Represents a resource registered in the dev server + */ +export interface Resource { + id: string + [key: string]: any +} diff --git a/packages/medusa/src/feature-flags/backend-hmr.ts b/packages/medusa/src/feature-flags/backend-hmr.ts new file mode 100644 index 0000000000..952a044524 --- /dev/null +++ b/packages/medusa/src/feature-flags/backend-hmr.ts @@ -0,0 +1,13 @@ +import { FlagSettings } from "@medusajs/framework/feature-flags" + +const BackendHmrFeatureFlag: FlagSettings = { + key: "backend_hmr", + default_val: false, + env_key: "MEDUSA_FF_BACKEND_HMR", + description: + "Enable experimental Hot Module Replacement (HMR) for backend development. " + + "When enabled, route, middleware, workflows and steps changes reload in <10ms without restarting the server. " + + "Database connections and container state persist across reloads.", +} + +export default BackendHmrFeatureFlag diff --git a/packages/medusa/src/loaders/api.ts b/packages/medusa/src/loaders/api.ts index 16c30e8364..84da414c87 100644 --- a/packages/medusa/src/loaders/api.ts +++ b/packages/medusa/src/loaders/api.ts @@ -1,6 +1,7 @@ import { ConfigModule } from "@medusajs/framework/config" import { ApiLoader } from "@medusajs/framework/http" import { MedusaContainer, PluginDetails } from "@medusajs/framework/types" +import { FeatureFlag } from "@medusajs/framework/utils" import { Express } from "express" import { join } from "path" import qs from "qs" @@ -24,6 +25,12 @@ export default async ({ app, container, plugins }: Options) => { next() }) + // Store the initial router stack length before loading API resources for HMR + if (FeatureFlag.isFeatureEnabled("backend_hmr")) { + const initialStackLength = (app as any)._router?.stack?.length ?? 0 + ;(global as any).__MEDUSA_HMR_INITIAL_STACK_LENGTH__ = initialStackLength + } + const sourcePaths: string[] = [] /** diff --git a/packages/modules/event-bus-local/src/services/event-bus-local.ts b/packages/modules/event-bus-local/src/services/event-bus-local.ts index a61e6f78bc..f2cc814d7e 100644 --- a/packages/modules/event-bus-local/src/services/event-bus-local.ts +++ b/packages/modules/event-bus-local/src/services/event-bus-local.ts @@ -172,7 +172,10 @@ export default class LocalEventBusService extends AbstractEventBusModuleService ): this { super.subscribe(event, subscriber, context) - this.eventEmitter_.on(event, async (data: Event) => { + const subscriberId = + context?.subscriberId ?? (subscriber as any).subscriberId + + const wrappedSubscriber = async (data: Event) => { try { await subscriber(data) } catch (err) { @@ -181,7 +184,13 @@ export default class LocalEventBusService extends AbstractEventBusModuleService ) this.logger_?.error(err) } - }) + } + + if (subscriberId) { + ;(wrappedSubscriber as any).subscriberId = subscriberId + } + + this.eventEmitter_.on(event, wrappedSubscriber) return this } @@ -193,7 +202,22 @@ export default class LocalEventBusService extends AbstractEventBusModuleService ): this { super.unsubscribe(event, subscriber, context) - this.eventEmitter_.off(event, subscriber) + const subscriberId = + context?.subscriberId ?? (subscriber as any).subscriberId + + if (subscriberId) { + const listeners = this.eventEmitter_.listeners(event) + const wrappedSubscriber = listeners.find( + (listener) => (listener as any).subscriberId === subscriberId + ) + + if (wrappedSubscriber) { + this.eventEmitter_.off(event, wrappedSubscriber as any) + } + } else { + this.eventEmitter_.off(event, subscriber) + } + return this } } diff --git a/yarn.lock b/yarn.lock index 42e2bff0b4..865118e3c5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3603,7 +3603,6 @@ __metadata: "@medusajs/cli": 2.12.1 connect-dynamodb: ^3.0.5 ioredis: ^5.4.1 - vite: ^5.4.21 peerDependenciesMeta: "@aws-sdk/client-dynamodb": optional: true @@ -3805,7 +3804,7 @@ __metadata: "@medusajs/workflow-engine-redis": 2.12.1 boxen: ^5.0.1 chalk: ^4.1.2 - chokidar: ^3.5.3 + chokidar: ^4.0.3 compression: ^1.8.0 express: ^4.21.0 fs-exists-cached: ^1.0.0