chore: configurable database migration in concurrency (#14004)

> [!NOTE]
> Adds a configurable concurrency for link migrations (CLI/commands), forces concurrency=1 when pgstream is detected, and ignores duplicate link-table inserts.
> 
> - **CLI**
>   - Add `--concurrency` option to `db:migrate` and `db:sync-links`.
> - **Medusa commands**
>   - `migrate` and `sync-links` accept `concurrency`; set `DB_MIGRATION_CONCURRENCY` and force `1` when `pgstream` schema exists via new `isPgstreamEnabled`.
> - **Link Modules (migrations)**
>   - Execute plan actions with `executeWithConcurrency` using `DB_MIGRATION_CONCURRENCY`.
>   - Make link-table tracking inserts idempotent with `ON CONFLICT DO NOTHING` (including bulk/upsert and per-create).
> 
> <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 07432293c8fe8de30b07920fa47823b9081edacc. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup>
This commit is contained in:
Carlos R. L. Rodrigues
2025-12-04 08:30:30 -03:00
committed by GitHub
parent a33ef1e4d9
commit 5b7e3c0e76
6 changed files with 104 additions and 28 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/link-modules": patch
"@medusajs/cli": patch
"@medusajs/medusa": patch
---
chore(link-modules): ignore duplicates link creation

View File

@@ -196,6 +196,10 @@ function buildLocalCommands(cli, isLocalProject) {
describe:
"Skip prompts and execute only safe actions from sync links",
})
builder.option("concurrency", {
type: "number",
describe: "Number of concurrent migrations to run",
})
},
handler: handlerP(
getCommandHandler("db/migrate", (args, cmd) => {
@@ -270,6 +274,10 @@ function buildLocalCommands(cli, isLocalProject) {
type: "boolean",
describe: "Skip prompts and execute only safe actions",
})
builder.option("concurrency", {
type: "number",
describe: "Number of concurrent migrations to run",
})
},
handler: handlerP(
getCommandHandler("db/sync-links", (args, cmd) => {

View File

@@ -3,13 +3,14 @@ import { LinkLoader } from "@medusajs/framework/links"
import {
ContainerRegistrationKeys,
getResolvedPlugins,
isDefined,
mergePluginModules,
} from "@medusajs/framework/utils"
import { Logger, MedusaContainer } from "@medusajs/types"
import { fork } from "child_process"
import path, { join } from "path"
import { initializeContainer } from "../../loaders"
import { ensureDbExists } from "../utils"
import { ensureDbExists, isPgstreamEnabled } from "../utils"
import { syncLinks } from "./sync-links"
const TERMINAL_SIZE = process.stdout.columns
@@ -26,6 +27,7 @@ export async function migrate({
skipScripts,
executeAllLinks,
executeSafeLinks,
concurrency,
logger,
container,
}: {
@@ -34,6 +36,7 @@ export async function migrate({
skipScripts: boolean
executeAllLinks: boolean
executeSafeLinks: boolean
concurrency?: number
logger: Logger
container: MedusaContainer
}): Promise<boolean> {
@@ -43,6 +46,16 @@ export async function migrate({
await ensureDbExists(container)
// If pgstream is enabled, force concurrency to 1
const pgstreamEnabled = await isPgstreamEnabled(container)
if (pgstreamEnabled) {
concurrency = 1
}
if (isDefined(concurrency)) {
process.env.DB_MIGRATION_CONCURRENCY = String(concurrency)
}
const medusaAppLoader = new MedusaAppLoader()
const configModule = container.resolve(
ContainerRegistrationKeys.CONFIG_MODULE
@@ -80,6 +93,7 @@ export async function migrate({
executeSafe: executeSafeLinks,
directory,
container,
concurrency,
})
}
@@ -112,6 +126,7 @@ const main = async function ({
skipScripts,
executeAllLinks,
executeSafeLinks,
concurrency,
}) {
const container = await initializeContainer(directory)
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
@@ -123,6 +138,7 @@ const main = async function ({
skipScripts,
executeAllLinks,
executeSafeLinks,
concurrency,
logger,
container,
})

View File

@@ -9,6 +9,7 @@ import {
import {
ContainerRegistrationKeys,
getResolvedPlugins,
isDefined,
mergePluginModules,
} from "@medusajs/framework/utils"
import boxen from "boxen"
@@ -16,7 +17,7 @@ import chalk from "chalk"
import { join } from "path"
import { initializeContainer } from "../../loaders"
import { ensureDbExists } from "../utils"
import { ensureDbExists, isPgstreamEnabled } from "../utils"
/**
* Groups action tables by their "action" property
@@ -102,15 +103,27 @@ export async function syncLinks(
executeSafe,
directory,
container,
concurrency,
}: {
executeSafe: boolean
executeAll: boolean
directory: string
container: MedusaContainer
concurrency?: number
}
) {
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
// Check if pgstream is enabled - if so, force concurrency to 1
const pgstreamEnabled = await isPgstreamEnabled(container)
if (pgstreamEnabled) {
concurrency = 1
}
if (isDefined(concurrency)) {
process.env.DB_MIGRATION_CONCURRENCY = String(concurrency)
}
const planner = await medusaAppLoader.getLinksExecutionPlanner()
logger.info("Syncing links...")
@@ -192,7 +205,12 @@ export async function syncLinks(
}
}
const main = async function ({ directory, executeSafe, executeAll }) {
const main = async function ({
directory,
executeSafe,
executeAll,
concurrency,
}) {
const container = await initializeContainer(directory)
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
@@ -218,6 +236,7 @@ const main = async function ({ directory, executeSafe, executeAll }) {
executeSafe,
directory,
container,
concurrency,
})
process.exit()
} catch (error) {

View File

@@ -19,3 +19,21 @@ export async function ensureDbExists(container: MedusaContainer) {
process.exit(1)
}
}
export async function isPgstreamEnabled(
container: MedusaContainer
): Promise<boolean> {
const pgConnection = container.resolve(
ContainerRegistrationKeys.PG_CONNECTION
)
try {
const result = await pgConnection.raw(
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'pgstream'"
)
return result.rows.length > 0
} catch (error) {
// If there's an error checking, assume pgstream is not enabled
return false
}
}

View File

@@ -6,18 +6,18 @@ import {
PlannerActionLinkDescriptor,
} from "@medusajs/framework/types"
import {
arrayDifference,
DALUtils,
ModulesSdkUtils,
normalizeMigrationSQL,
promiseAll,
} from "@medusajs/framework/utils"
import { EntitySchema, MikroORM } from "@medusajs/framework/mikro-orm/core"
import {
DatabaseSchema,
PostgreSqlDriver,
} from "@medusajs/framework/mikro-orm/postgresql"
import {
arrayDifference,
DALUtils,
executeWithConcurrency,
ModulesSdkUtils,
normalizeMigrationSQL,
} from "@medusajs/framework/utils"
import { generateEntity } from "../utils"
/**
@@ -195,7 +195,7 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
INSERT INTO "${
this.tableName
}" (table_name, link_descriptor) VALUES (?, ?);
}" (table_name, link_descriptor) VALUES (?, ?) ON CONFLICT DO NOTHING;
${sql}
`,
@@ -513,22 +513,30 @@ export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
async executePlan(actionPlan: LinkMigrationsPlannerAction[]): Promise<void> {
const orm = await this.createORM()
await promiseAll(
actionPlan.map(async (action) => {
switch (action.action) {
case "delete":
return await this.dropLinkTable(orm, action.tableName)
case "create":
return await this.createLinkTable(orm, action)
case "update":
const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${
action.sql
}`
return await orm.em.getDriver().getConnection().execute(sql)
default:
return
}
})
).finally(() => orm.close(true))
try {
const concurrency = parseInt(process.env.DB_MIGRATION_CONCURRENCY ?? "1")
await executeWithConcurrency(
actionPlan.map((action) => {
return async () => {
switch (action.action) {
case "delete":
return await this.dropLinkTable(orm, action.tableName)
case "create":
return await this.createLinkTable(orm, action)
case "update":
const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${
action.sql
}`
return await orm.em.getDriver().getConnection().execute(sql)
default:
return
}
}
}),
concurrency
)
} finally {
await orm.close(true)
}
}
}