diff --git a/.changeset/tender-clocks-talk.md b/.changeset/tender-clocks-talk.md new file mode 100644 index 0000000000..f2cfbbe35c --- /dev/null +++ b/.changeset/tender-clocks-talk.md @@ -0,0 +1,7 @@ +--- +"@medusajs/link-modules": patch +"@medusajs/cli": patch +"@medusajs/medusa": patch +--- + +chore(link-modules): ignore duplicates link creation diff --git a/packages/cli/medusa-cli/src/create-cli.ts b/packages/cli/medusa-cli/src/create-cli.ts index 18a9156c51..ddee9be4f9 100644 --- a/packages/cli/medusa-cli/src/create-cli.ts +++ b/packages/cli/medusa-cli/src/create-cli.ts @@ -196,6 +196,10 @@ function buildLocalCommands(cli, isLocalProject) { describe: "Skip prompts and execute only safe actions from sync links", }) + builder.option("concurrency", { + type: "number", + describe: "Number of concurrent migrations to run", + }) }, handler: handlerP( getCommandHandler("db/migrate", (args, cmd) => { @@ -270,6 +274,10 @@ function buildLocalCommands(cli, isLocalProject) { type: "boolean", describe: "Skip prompts and execute only safe actions", }) + builder.option("concurrency", { + type: "number", + describe: "Number of concurrent migrations to run", + }) }, handler: handlerP( getCommandHandler("db/sync-links", (args, cmd) => { diff --git a/packages/medusa/src/commands/db/migrate.ts b/packages/medusa/src/commands/db/migrate.ts index ac6943393b..95e61fd7ea 100644 --- a/packages/medusa/src/commands/db/migrate.ts +++ b/packages/medusa/src/commands/db/migrate.ts @@ -3,13 +3,14 @@ import { LinkLoader } from "@medusajs/framework/links" import { ContainerRegistrationKeys, getResolvedPlugins, + isDefined, mergePluginModules, } from "@medusajs/framework/utils" import { Logger, MedusaContainer } from "@medusajs/types" import { fork } from "child_process" import path, { join } from "path" import { initializeContainer } from "../../loaders" -import { ensureDbExists } from "../utils" +import { ensureDbExists, isPgstreamEnabled } from "../utils" import { syncLinks } from "./sync-links" const TERMINAL_SIZE = process.stdout.columns @@ -26,6 +27,7 @@ export async function migrate({ skipScripts, executeAllLinks, executeSafeLinks, + concurrency, logger, container, }: { @@ -34,6 +36,7 @@ export async function migrate({ skipScripts: boolean executeAllLinks: boolean executeSafeLinks: boolean + concurrency?: number logger: Logger container: MedusaContainer }): Promise { @@ -43,6 +46,16 @@ export async function migrate({ await ensureDbExists(container) + // If pgstream is enabled, force concurrency to 1 + const pgstreamEnabled = await isPgstreamEnabled(container) + if (pgstreamEnabled) { + concurrency = 1 + } + + if (isDefined(concurrency)) { + process.env.DB_MIGRATION_CONCURRENCY = String(concurrency) + } + const medusaAppLoader = new MedusaAppLoader() const configModule = container.resolve( ContainerRegistrationKeys.CONFIG_MODULE @@ -80,6 +93,7 @@ export async function migrate({ executeSafe: executeSafeLinks, directory, container, + concurrency, }) } @@ -112,6 +126,7 @@ const main = async function ({ skipScripts, executeAllLinks, executeSafeLinks, + concurrency, }) { const container = await initializeContainer(directory) const logger = container.resolve(ContainerRegistrationKeys.LOGGER) @@ -123,6 +138,7 @@ const main = async function ({ skipScripts, executeAllLinks, executeSafeLinks, + concurrency, logger, container, }) diff --git a/packages/medusa/src/commands/db/sync-links.ts b/packages/medusa/src/commands/db/sync-links.ts index 825a54248c..7cf9084123 100644 --- a/packages/medusa/src/commands/db/sync-links.ts +++ b/packages/medusa/src/commands/db/sync-links.ts @@ -9,6 +9,7 @@ import { import { ContainerRegistrationKeys, getResolvedPlugins, + isDefined, mergePluginModules, } from "@medusajs/framework/utils" import boxen from "boxen" @@ -16,7 +17,7 @@ import chalk from "chalk" import { join } from "path" import { initializeContainer } from "../../loaders" -import { ensureDbExists } from "../utils" +import { ensureDbExists, isPgstreamEnabled } from "../utils" /** * Groups action tables by their "action" property @@ -102,15 +103,27 @@ export async function syncLinks( executeSafe, directory, container, + concurrency, }: { executeSafe: boolean executeAll: boolean directory: string container: MedusaContainer + concurrency?: number } ) { const logger = container.resolve(ContainerRegistrationKeys.LOGGER) + // Check if pgstream is enabled - if so, force concurrency to 1 + const pgstreamEnabled = await isPgstreamEnabled(container) + if (pgstreamEnabled) { + concurrency = 1 + } + + if (isDefined(concurrency)) { + process.env.DB_MIGRATION_CONCURRENCY = String(concurrency) + } + const planner = await medusaAppLoader.getLinksExecutionPlanner() logger.info("Syncing links...") @@ -192,7 +205,12 @@ export async function syncLinks( } } -const main = async function ({ directory, executeSafe, executeAll }) { +const main = async function ({ + directory, + executeSafe, + executeAll, + concurrency, +}) { const container = await initializeContainer(directory) const logger = container.resolve(ContainerRegistrationKeys.LOGGER) @@ -218,6 +236,7 @@ const main = async function ({ directory, executeSafe, executeAll }) { executeSafe, directory, container, + concurrency, }) process.exit() } catch (error) { diff --git a/packages/medusa/src/commands/utils/index.ts b/packages/medusa/src/commands/utils/index.ts index 08c5512d77..9d2efb4256 100644 --- a/packages/medusa/src/commands/utils/index.ts +++ b/packages/medusa/src/commands/utils/index.ts @@ -19,3 +19,21 @@ export async function ensureDbExists(container: MedusaContainer) { process.exit(1) } } + +export async function isPgstreamEnabled( + container: MedusaContainer +): Promise { + const pgConnection = container.resolve( + ContainerRegistrationKeys.PG_CONNECTION + ) + + try { + const result = await pgConnection.raw( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgstream'" + ) + return result.rows.length > 0 + } catch (error) { + // If there's an error checking, assume pgstream is not enabled + return false + } +} diff --git a/packages/modules/link-modules/src/migration/index.ts b/packages/modules/link-modules/src/migration/index.ts index ce96564f06..cf19051158 100644 --- a/packages/modules/link-modules/src/migration/index.ts +++ b/packages/modules/link-modules/src/migration/index.ts @@ -6,18 +6,18 @@ import { PlannerActionLinkDescriptor, } from "@medusajs/framework/types" -import { - arrayDifference, - DALUtils, - ModulesSdkUtils, - normalizeMigrationSQL, - promiseAll, -} from "@medusajs/framework/utils" import { EntitySchema, MikroORM } from "@medusajs/framework/mikro-orm/core" import { DatabaseSchema, PostgreSqlDriver, } from "@medusajs/framework/mikro-orm/postgresql" +import { + arrayDifference, + DALUtils, + executeWithConcurrency, + ModulesSdkUtils, + normalizeMigrationSQL, +} from "@medusajs/framework/utils" import { generateEntity } from "../utils" /** @@ -195,7 +195,7 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner { INSERT INTO "${ this.tableName - }" (table_name, link_descriptor) VALUES (?, ?); + }" (table_name, link_descriptor) VALUES (?, ?) ON CONFLICT DO NOTHING; ${sql} `, @@ -513,22 +513,30 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner { async executePlan(actionPlan: LinkMigrationsPlannerAction[]): Promise { const orm = await this.createORM() - await promiseAll( - actionPlan.map(async (action) => { - switch (action.action) { - case "delete": - return await this.dropLinkTable(orm, action.tableName) - case "create": - return await this.createLinkTable(orm, action) - case "update": - const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${ - action.sql - }` - return await orm.em.getDriver().getConnection().execute(sql) - default: - return - } - }) - ).finally(() => orm.close(true)) + try { + const concurrency = parseInt(process.env.DB_MIGRATION_CONCURRENCY ?? "1") + await executeWithConcurrency( + actionPlan.map((action) => { + return async () => { + switch (action.action) { + case "delete": + return await this.dropLinkTable(orm, action.tableName) + case "create": + return await this.createLinkTable(orm, action) + case "update": + const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${ + action.sql + }` + return await orm.em.getDriver().getConnection().execute(sql) + default: + return + } + } + }), + concurrency + ) + } finally { + await orm.close(true) + } } }