From 5b7e3c0e7649a3a61b0a4fe796f37e5f7fbe9a3a Mon Sep 17 00:00:00 2001
From: "Carlos R. L. Rodrigues"
<37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Date: Thu, 4 Dec 2025 08:30:30 -0300
Subject: [PATCH] chore: configurable database migration in concurrency
(#14004)
> [!NOTE]
> Adds a configurable concurrency for link migrations (CLI/commands), forces concurrency=1 when pgstream is detected, and ignores duplicate link-table inserts.
>
> - **CLI**
> - Add `--concurrency` option to `db:migrate` and `db:sync-links`.
> - **Medusa commands**
> - `migrate` and `sync-links` accept `concurrency`; set `DB_MIGRATION_CONCURRENCY` and force `1` when `pgstream` schema exists via new `isPgstreamEnabled`.
> - **Link Modules (migrations)**
> - Execute plan actions with `executeWithConcurrency` using `DB_MIGRATION_CONCURRENCY`.
> - Make link-table tracking inserts idempotent with `ON CONFLICT DO NOTHING` (including bulk/upsert and per-create).
>
> Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 07432293c8fe8de30b07920fa47823b9081edacc. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).
---
.changeset/tender-clocks-talk.md | 7 +++
packages/cli/medusa-cli/src/create-cli.ts | 8 +++
packages/medusa/src/commands/db/migrate.ts | 18 +++++-
packages/medusa/src/commands/db/sync-links.ts | 23 +++++++-
packages/medusa/src/commands/utils/index.ts | 18 ++++++
.../link-modules/src/migration/index.ts | 58 +++++++++++--------
6 files changed, 104 insertions(+), 28 deletions(-)
create mode 100644 .changeset/tender-clocks-talk.md
diff --git a/.changeset/tender-clocks-talk.md b/.changeset/tender-clocks-talk.md
new file mode 100644
index 0000000000..f2cfbbe35c
--- /dev/null
+++ b/.changeset/tender-clocks-talk.md
@@ -0,0 +1,7 @@
+---
+"@medusajs/link-modules": patch
+"@medusajs/cli": patch
+"@medusajs/medusa": patch
+---
+
+chore(link-modules): ignore duplicates link creation
diff --git a/packages/cli/medusa-cli/src/create-cli.ts b/packages/cli/medusa-cli/src/create-cli.ts
index 18a9156c51..ddee9be4f9 100644
--- a/packages/cli/medusa-cli/src/create-cli.ts
+++ b/packages/cli/medusa-cli/src/create-cli.ts
@@ -196,6 +196,10 @@ function buildLocalCommands(cli, isLocalProject) {
describe:
"Skip prompts and execute only safe actions from sync links",
})
+ builder.option("concurrency", {
+ type: "number",
+ describe: "Number of concurrent migrations to run",
+ })
},
handler: handlerP(
getCommandHandler("db/migrate", (args, cmd) => {
@@ -270,6 +274,10 @@ function buildLocalCommands(cli, isLocalProject) {
type: "boolean",
describe: "Skip prompts and execute only safe actions",
})
+ builder.option("concurrency", {
+ type: "number",
+ describe: "Number of concurrent migrations to run",
+ })
},
handler: handlerP(
getCommandHandler("db/sync-links", (args, cmd) => {
diff --git a/packages/medusa/src/commands/db/migrate.ts b/packages/medusa/src/commands/db/migrate.ts
index ac6943393b..95e61fd7ea 100644
--- a/packages/medusa/src/commands/db/migrate.ts
+++ b/packages/medusa/src/commands/db/migrate.ts
@@ -3,13 +3,14 @@ import { LinkLoader } from "@medusajs/framework/links"
import {
ContainerRegistrationKeys,
getResolvedPlugins,
+ isDefined,
mergePluginModules,
} from "@medusajs/framework/utils"
import { Logger, MedusaContainer } from "@medusajs/types"
import { fork } from "child_process"
import path, { join } from "path"
import { initializeContainer } from "../../loaders"
-import { ensureDbExists } from "../utils"
+import { ensureDbExists, isPgstreamEnabled } from "../utils"
import { syncLinks } from "./sync-links"
const TERMINAL_SIZE = process.stdout.columns
@@ -26,6 +27,7 @@ export async function migrate({
skipScripts,
executeAllLinks,
executeSafeLinks,
+ concurrency,
logger,
container,
}: {
@@ -34,6 +36,7 @@ export async function migrate({
skipScripts: boolean
executeAllLinks: boolean
executeSafeLinks: boolean
+ concurrency?: number
logger: Logger
container: MedusaContainer
}): Promise {
@@ -43,6 +46,16 @@ export async function migrate({
await ensureDbExists(container)
+ // If pgstream is enabled, force concurrency to 1
+ const pgstreamEnabled = await isPgstreamEnabled(container)
+ if (pgstreamEnabled) {
+ concurrency = 1
+ }
+
+ if (isDefined(concurrency)) {
+ process.env.DB_MIGRATION_CONCURRENCY = String(concurrency)
+ }
+
const medusaAppLoader = new MedusaAppLoader()
const configModule = container.resolve(
ContainerRegistrationKeys.CONFIG_MODULE
@@ -80,6 +93,7 @@ export async function migrate({
executeSafe: executeSafeLinks,
directory,
container,
+ concurrency,
})
}
@@ -112,6 +126,7 @@ const main = async function ({
skipScripts,
executeAllLinks,
executeSafeLinks,
+ concurrency,
}) {
const container = await initializeContainer(directory)
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
@@ -123,6 +138,7 @@ const main = async function ({
skipScripts,
executeAllLinks,
executeSafeLinks,
+ concurrency,
logger,
container,
})
diff --git a/packages/medusa/src/commands/db/sync-links.ts b/packages/medusa/src/commands/db/sync-links.ts
index 825a54248c..7cf9084123 100644
--- a/packages/medusa/src/commands/db/sync-links.ts
+++ b/packages/medusa/src/commands/db/sync-links.ts
@@ -9,6 +9,7 @@ import {
import {
ContainerRegistrationKeys,
getResolvedPlugins,
+ isDefined,
mergePluginModules,
} from "@medusajs/framework/utils"
import boxen from "boxen"
@@ -16,7 +17,7 @@ import chalk from "chalk"
import { join } from "path"
import { initializeContainer } from "../../loaders"
-import { ensureDbExists } from "../utils"
+import { ensureDbExists, isPgstreamEnabled } from "../utils"
/**
* Groups action tables by their "action" property
@@ -102,15 +103,27 @@ export async function syncLinks(
executeSafe,
directory,
container,
+ concurrency,
}: {
executeSafe: boolean
executeAll: boolean
directory: string
container: MedusaContainer
+ concurrency?: number
}
) {
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
+ // Check if pgstream is enabled - if so, force concurrency to 1
+ const pgstreamEnabled = await isPgstreamEnabled(container)
+ if (pgstreamEnabled) {
+ concurrency = 1
+ }
+
+ if (isDefined(concurrency)) {
+ process.env.DB_MIGRATION_CONCURRENCY = String(concurrency)
+ }
+
const planner = await medusaAppLoader.getLinksExecutionPlanner()
logger.info("Syncing links...")
@@ -192,7 +205,12 @@ export async function syncLinks(
}
}
-const main = async function ({ directory, executeSafe, executeAll }) {
+const main = async function ({
+ directory,
+ executeSafe,
+ executeAll,
+ concurrency,
+}) {
const container = await initializeContainer(directory)
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
@@ -218,6 +236,7 @@ const main = async function ({ directory, executeSafe, executeAll }) {
executeSafe,
directory,
container,
+ concurrency,
})
process.exit()
} catch (error) {
diff --git a/packages/medusa/src/commands/utils/index.ts b/packages/medusa/src/commands/utils/index.ts
index 08c5512d77..9d2efb4256 100644
--- a/packages/medusa/src/commands/utils/index.ts
+++ b/packages/medusa/src/commands/utils/index.ts
@@ -19,3 +19,21 @@ export async function ensureDbExists(container: MedusaContainer) {
process.exit(1)
}
}
+
+export async function isPgstreamEnabled(
+ container: MedusaContainer
+): Promise {
+ const pgConnection = container.resolve(
+ ContainerRegistrationKeys.PG_CONNECTION
+ )
+
+ try {
+ const result = await pgConnection.raw(
+ "SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgstream'"
+ )
+ return result.rows.length > 0
+ } catch (error) {
+ // If there's an error checking, assume pgstream is not enabled
+ return false
+ }
+}
diff --git a/packages/modules/link-modules/src/migration/index.ts b/packages/modules/link-modules/src/migration/index.ts
index ce96564f06..cf19051158 100644
--- a/packages/modules/link-modules/src/migration/index.ts
+++ b/packages/modules/link-modules/src/migration/index.ts
@@ -6,18 +6,18 @@ import {
PlannerActionLinkDescriptor,
} from "@medusajs/framework/types"
-import {
- arrayDifference,
- DALUtils,
- ModulesSdkUtils,
- normalizeMigrationSQL,
- promiseAll,
-} from "@medusajs/framework/utils"
import { EntitySchema, MikroORM } from "@medusajs/framework/mikro-orm/core"
import {
DatabaseSchema,
PostgreSqlDriver,
} from "@medusajs/framework/mikro-orm/postgresql"
+import {
+ arrayDifference,
+ DALUtils,
+ executeWithConcurrency,
+ ModulesSdkUtils,
+ normalizeMigrationSQL,
+} from "@medusajs/framework/utils"
import { generateEntity } from "../utils"
/**
@@ -195,7 +195,7 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
INSERT INTO "${
this.tableName
- }" (table_name, link_descriptor) VALUES (?, ?);
+ }" (table_name, link_descriptor) VALUES (?, ?) ON CONFLICT DO NOTHING;
${sql}
`,
@@ -513,22 +513,30 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
async executePlan(actionPlan: LinkMigrationsPlannerAction[]): Promise {
const orm = await this.createORM()
- await promiseAll(
- actionPlan.map(async (action) => {
- switch (action.action) {
- case "delete":
- return await this.dropLinkTable(orm, action.tableName)
- case "create":
- return await this.createLinkTable(orm, action)
- case "update":
- const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${
- action.sql
- }`
- return await orm.em.getDriver().getConnection().execute(sql)
- default:
- return
- }
- })
- ).finally(() => orm.close(true))
+ try {
+ const concurrency = parseInt(process.env.DB_MIGRATION_CONCURRENCY ?? "1")
+ await executeWithConcurrency(
+ actionPlan.map((action) => {
+ return async () => {
+ switch (action.action) {
+ case "delete":
+ return await this.dropLinkTable(orm, action.tableName)
+ case "create":
+ return await this.createLinkTable(orm, action)
+ case "update":
+ const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${
+ action.sql
+ }`
+ return await orm.em.getDriver().getConnection().execute(sql)
+ default:
+ return
+ }
+ }
+ }),
+ concurrency
+ )
+ } finally {
+ await orm.close(true)
+ }
}
}