Files
medusa-store/packages/modules/link-modules/src/migration/index.ts
Carlos R. L. Rodrigues 5b7e3c0e76 chore: configurable database migration in concurrency (#14004)
> [!NOTE]
> Adds a configurable concurrency for link migrations (CLI/commands), forces concurrency=1 when pgstream is detected, and ignores duplicate link-table inserts.
> 
> - **CLI**
>   - Add `--concurrency` option to `db:migrate` and `db:sync-links`.
> - **Medusa commands**
>   - `migrate` and `sync-links` accept `concurrency`; set `DB_MIGRATION_CONCURRENCY` and force `1` when `pgstream` schema exists via new `isPgstreamEnabled`.
> - **Link Modules (migrations)**
>   - Execute plan actions with `executeWithConcurrency` using `DB_MIGRATION_CONCURRENCY`.
>   - Make link-table tracking inserts idempotent with `ON CONFLICT DO NOTHING` (including bulk/upsert and per-create).
> 
> <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 07432293c8fe8de30b07920fa47823b9081edacc. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup>
2025-12-04 11:30:30 +00:00

543 lines
15 KiB
TypeScript

import {
ILinkMigrationsPlanner,
LinkMigrationsPlannerAction,
ModuleJoinerConfig,
ModuleServiceInitializeOptions,
PlannerActionLinkDescriptor,
} from "@medusajs/framework/types"
import { EntitySchema, MikroORM } from "@medusajs/framework/mikro-orm/core"
import {
DatabaseSchema,
PostgreSqlDriver,
} from "@medusajs/framework/mikro-orm/postgresql"
import {
arrayDifference,
DALUtils,
executeWithConcurrency,
ModulesSdkUtils,
normalizeMigrationSQL,
} from "@medusajs/framework/utils"
import { generateEntity } from "../utils"
/**
* The migrations execution planner creates a plan of SQL queries
* to be executed to keep link modules database state in sync
* with the links defined inside the user application.
*/
export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
/**
* Database options for the module service
*/
#dbConfig: ReturnType<typeof ModulesSdkUtils.loadDatabaseConfig>
#schema: string = "public"
/**
* The set of commands that are unsafe to execute automatically when
* performing "alter table"
*/
#unsafeSQLCommands = ["alter column", "drop column"]
/**
* On-the-fly computed set of entities for the user provided joinerConfig and the link it is coming from
*/
#linksEntities: {
linkDescriptor: PlannerActionLinkDescriptor
entity: EntitySchema
}[]
/**
* The table that keeps a track of tables generated by the link
* module.
*/
protected tableName = "link_module_migrations"
constructor(
joinerConfig: ModuleJoinerConfig[],
options?: ModuleServiceInitializeOptions
) {
this.#dbConfig = ModulesSdkUtils.loadDatabaseConfig("link_modules", options)
this.#schema = options?.database?.schema ?? "public"
this.#linksEntities = joinerConfig
.map((config) => {
if (config.isReadOnlyLink) {
return
}
const [primary, foreign] = config.relationships ?? []
const linkDescriptor: PlannerActionLinkDescriptor = {
fromModule: primary.serviceName,
toModule: foreign.serviceName,
fromModel: primary.alias,
toModel: foreign.alias,
}
return {
entity: generateEntity(config, primary, foreign),
linkDescriptor,
}
})
.filter((item) => !!item)
}
/**
* Initializes the ORM using the normalized dbConfig and set
* of provided entities
*/
protected async createORM(entities: EntitySchema[] = []) {
return await DALUtils.mikroOrmCreateConnection(this.#dbConfig, entities, "")
}
/**
* Ensure the table to track link modules migrations
* exists.
*
* @param orm MikroORM
*/
protected async ensureMigrationsTable(
orm: MikroORM<PostgreSqlDriver>
): Promise<void> {
await orm.em.getDriver().getConnection().execute(`
CREATE TABLE IF NOT EXISTS "${this.#schema}"."${this.tableName}" (
id SERIAL PRIMARY KEY,
table_name VARCHAR(255) NOT NULL UNIQUE,
link_descriptor JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`)
}
/**
* Ensure the migrations table is in sync
*
* @param orm
* @protected
*/
protected async ensureMigrationsTableUpToDate(
orm: MikroORM<PostgreSqlDriver>
) {
const existingTables: string[] = (
await orm.em
.getDriver()
.getConnection()
.execute<
{
table_name: string
}[]
>(
`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '${this.#schema}';
`
)
)
.map(({ table_name }) => table_name)
.filter((tableName) =>
this.#linksEntities.some(
({ entity }) => entity.meta.collection === tableName
)
)
if (!existingTables.length) {
return
}
const orderedDescriptors = existingTables.map((tableName) => {
return this.#linksEntities.find(
({ entity }) => entity.meta.collection === tableName
)!.linkDescriptor
})
const positionalArgs = new Array(existingTables.length)
.fill("(?, ?)")
.join(", ")
await orm.em
.getDriver()
.getConnection()
.execute(
`
INSERT INTO "${this.#schema}"."${
this.tableName
}" (table_name, link_descriptor) VALUES ${positionalArgs} ON CONFLICT DO NOTHING;
`,
existingTables.flatMap((tableName, index) => [
tableName,
JSON.stringify(orderedDescriptors[index]),
])
)
}
/**
* Insert tuple to the migrations table and create the link table
*
* @param orm
* @param action
* @protected
*/
protected async createLinkTable(
orm: MikroORM<PostgreSqlDriver>,
action: LinkMigrationsPlannerAction & {
linkDescriptor: PlannerActionLinkDescriptor
sql: string
}
) {
const { tableName, linkDescriptor, sql } = action
await orm.em
.getDriver()
.getConnection()
.execute(
`
SET LOCAL search_path TO "${this.#schema}";
INSERT INTO "${
this.tableName
}" (table_name, link_descriptor) VALUES (?, ?) ON CONFLICT DO NOTHING;
${sql}
`,
[tableName, linkDescriptor]
)
}
/**
* Drops the link table and untracks it from the "link_modules_migrations"
* table.
*/
protected async dropLinkTable(
orm: MikroORM<PostgreSqlDriver>,
tableName: string
) {
await orm.em.getDriver().getConnection().execute(`
DROP TABLE IF EXISTS "${this.#schema}"."${tableName}";
DELETE FROM "${this.#schema}"."${
this.tableName
}" WHERE table_name = '${tableName}';
`)
}
/**
* Returns an array of table names that have been tracked during
* the last run. In short, these tables were created by the
* link modules migrations runner.
*
* @param orm MikroORM
*/
protected async getTrackedLinksTables(
orm: MikroORM<PostgreSqlDriver>
): Promise<
{
table_name: string
link_descriptor: PlannerActionLinkDescriptor
}[]
> {
const results = await orm.em.getDriver().getConnection().execute<
{
table_name: string
link_descriptor: PlannerActionLinkDescriptor
}[]
>(`
SELECT table_name, link_descriptor from "${this.#schema}"."${
this.tableName
}"
`)
return results.map((tuple) => ({
table_name: tuple.table_name,
link_descriptor: tuple.link_descriptor,
}))
}
private pickTableRelatedCommands(tableName: string, sqlCommand: string) {
const ignoreColumns = ["created_at", "updated_at", "deleted_at"]
const commands = sqlCommand.split(";")
const returnedCommands = commands
.filter((command) => {
const cmd = command.trim()
return (
cmd.length &&
cmd !== "set names 'utf8'" &&
cmd.includes(`"${tableName}"`) &&
!ignoreColumns.some((column) => cmd.includes(`column "${column}"`))
)
})
.map((cmd) => cmd.trim())
if (returnedCommands.length > 0) {
// adds ; at the end of each command
returnedCommands.push("")
}
return returnedCommands.join(";")
}
/**
* Returns the migration plan for a specific link entity.
*/
protected async getEntityMigrationPlan(
linkDescriptor: PlannerActionLinkDescriptor,
entity: EntitySchema,
trackedLinksTables: string[]
): Promise<LinkMigrationsPlannerAction> {
const tableName = entity.meta.collection
const orm = await this.createORM([entity])
try {
const generator = orm.getSchemaGenerator()
const platform = orm.em.getPlatform()
const connection = orm.em.getConnection()
const schemaName = this.#dbConfig.schema || "public"
/**
* If the table name for the entity has not been
* managed by us earlier, then we should create
* it.
*/
if (!trackedLinksTables.includes(tableName)) {
return {
action: "create",
linkDescriptor,
tableName,
sql: normalizeMigrationSQL(await generator.getCreateSchemaSQL()),
}
}
/**
* Pre-fetching information schema from the database and using that
* as the way to compute the update diff.
*
* @note
* The "loadInformationSchema" mutates the "dbSchema" argument provided
* to it as the first argument.
*/
const dbSchema = new DatabaseSchema(platform, schemaName)
await platform
.getSchemaHelper?.()
?.loadInformationSchema(dbSchema, connection, [
{
table_name: tableName,
schema_name: schemaName,
},
])
let updateSQL = normalizeMigrationSQL(
await generator.getUpdateSchemaSQL({
fromSchema: dbSchema,
})
)
updateSQL = this.pickTableRelatedCommands(tableName, updateSQL)
/**
* Entity is upto-date and hence we do not have to perform
* any updates on it.
*/
if (!updateSQL.length) {
return {
action: "noop",
linkDescriptor,
tableName,
}
}
const usesUnsafeCommands = this.#unsafeSQLCommands.some((fragment) => {
return updateSQL.match(new RegExp(`${fragment}`, "ig"))
})
return {
action: usesUnsafeCommands ? "notify" : "update",
linkDescriptor,
tableName,
sql: updateSQL,
}
} finally {
await orm.close(true)
}
}
/**
* This method loops over the tables we have fetched from the
* "link_module_migrations" tables and checks if their new
* name is different from the tracked name and in that
* case it will rename the actual table and also the
* tracked entry.
*/
protected async migrateOldTables(
orm: MikroORM<PostgreSqlDriver>,
trackedTables: {
table_name: string
link_descriptor: PlannerActionLinkDescriptor
}[]
) {
const migratedTables: {
table_name: string
link_descriptor: PlannerActionLinkDescriptor
}[] = []
for (let trackedTable of trackedTables) {
const linkEntity = this.#linksEntities.find((entity) => {
return (
entity.linkDescriptor.fromModel ===
trackedTable.link_descriptor.fromModel &&
entity.linkDescriptor.toModel ===
trackedTable.link_descriptor.toModel &&
entity.linkDescriptor.fromModule.toLowerCase() ===
trackedTable.link_descriptor.fromModule.toLowerCase() &&
entity.linkDescriptor.toModule.toLowerCase() ===
trackedTable.link_descriptor.toModule.toLowerCase()
)
})
const newTableName = linkEntity?.entity.meta.collection
/**
* Perform rename
*/
if (newTableName && trackedTable.table_name !== newTableName) {
await this.renameOldTable(
orm,
trackedTable.table_name,
newTableName,
linkEntity.linkDescriptor
)
migratedTables.push({
...trackedTable,
table_name: newTableName,
})
} else {
migratedTables.push({
...trackedTable,
})
}
}
return migratedTables
}
/**
* Renames existing table and also its tracked entry
*/
protected async renameOldTable(
orm: MikroORM<PostgreSqlDriver>,
oldName: string,
newName: string,
descriptor: PlannerActionLinkDescriptor
) {
await orm.em.getDriver().getConnection().execute(`
ALTER TABLE "${this.#schema}"."${oldName}" RENAME TO "${
this.#schema
}"."${newName}";
UPDATE "${this.#schema}"."${
this.tableName
}" SET table_name = '${newName}', link_descriptor = '${JSON.stringify(
descriptor
)}' WHERE table_name = '${oldName}';
`)
}
/**
* Creates a plan to executed in order to keep the database state in
* sync with the user-defined links.
*
* This method only creates a plan and does not change the database
* state. You must call the "executePlan" method for that.
*/
async createPlan() {
const orm = await this.createORM()
try {
await this.ensureMigrationsTable(orm)
const executionActions: LinkMigrationsPlannerAction[] = []
await this.ensureMigrationsTableUpToDate(orm)
const trackedTables = await this.migrateOldTables(
orm,
await this.getTrackedLinksTables(orm)
)
const trackedTablesNames = trackedTables.map(
({ table_name }) => table_name
)
/**
* Looping through the new set of entities and generating
* execution plan for them
*/
for (let { entity, linkDescriptor } of this.#linksEntities) {
executionActions.push(
await this.getEntityMigrationPlan(
linkDescriptor,
entity,
trackedTablesNames
)
)
}
const linksTableNames = this.#linksEntities.map(
({ entity }) => entity.meta.collection
)
/**
* Finding the tables to be removed
*/
const tablesToRemove = arrayDifference(
trackedTablesNames,
linksTableNames
)
tablesToRemove.forEach((tableToRemove) => {
executionActions.push({
action: "delete",
tableName: tableToRemove,
linkDescriptor: trackedTables.find(
({ table_name }) => tableToRemove === table_name
)!.link_descriptor,
})
})
return executionActions
} finally {
await orm.close(true)
}
}
/**
* Executes the actionsPlan actions where the action is one of 'create' | 'update' | 'delete'.
* 'noop' and 'notify' actions are implicitly ignored. If a notify action needs to be
* executed, you can mutate its action to 'update', in that scenario it means that an unsafe
* update sql (from our point of view) will be executed and some data could be lost.
*
* @param actionPlan
*/
async executePlan(actionPlan: LinkMigrationsPlannerAction[]): Promise<void> {
const orm = await this.createORM()
try {
const concurrency = parseInt(process.env.DB_MIGRATION_CONCURRENCY ?? "1")
await executeWithConcurrency(
actionPlan.map((action) => {
return async () => {
switch (action.action) {
case "delete":
return await this.dropLinkTable(orm, action.tableName)
case "create":
return await this.createLinkTable(orm, action)
case "update":
const sql = `SET LOCAL search_path TO "${this.#schema}"; \n\n${
action.sql
}`
return await orm.em.getDriver().getConnection().execute(sql)
default:
return
}
}
}),
concurrency
)
} finally {
await orm.close(true)
}
}
}