breaking: rework how links database migrations are managed (#8162)
This commit is contained in:
committed by
GitHub
parent
f435c6c7f6
commit
f74fdcb644
@@ -1,134 +1,394 @@
|
||||
import {
|
||||
JoinerRelationship,
|
||||
LoaderOptions,
|
||||
Logger,
|
||||
ILinkMigrationsPlanner,
|
||||
LinkMigrationsPlannerAction,
|
||||
ModuleJoinerConfig,
|
||||
ModuleServiceInitializeOptions,
|
||||
PlannerActionLinkDescriptor,
|
||||
} from "@medusajs/types"
|
||||
|
||||
import { generateEntity } from "../utils"
|
||||
import { EntitySchema, MikroORM } from "@mikro-orm/core"
|
||||
import { DatabaseSchema, PostgreSqlDriver } from "@mikro-orm/postgresql"
|
||||
import {
|
||||
arrayDifference,
|
||||
DALUtils,
|
||||
ModulesSdkUtils,
|
||||
promiseAll,
|
||||
} from "@medusajs/utils"
|
||||
|
||||
import { DALUtils, ModulesSdkUtils } from "@medusajs/utils"
|
||||
/**
|
||||
* The migrations execution planner creates a plan of SQL queries
|
||||
* to be executed to keep link modules database state in sync
|
||||
* with the links defined inside the user application.
|
||||
*/
|
||||
export class MigrationsExecutionPlanner implements ILinkMigrationsPlanner {
|
||||
/**
|
||||
* Database options for the module service
|
||||
*/
|
||||
#dbConfig: ReturnType<typeof ModulesSdkUtils.loadDatabaseConfig>
|
||||
|
||||
export function getMigration(
|
||||
joinerConfig: ModuleJoinerConfig,
|
||||
serviceName: string,
|
||||
primary: JoinerRelationship,
|
||||
foreign: JoinerRelationship
|
||||
) {
|
||||
return async function runMigrations(
|
||||
{
|
||||
options,
|
||||
logger,
|
||||
}: Pick<
|
||||
LoaderOptions<ModuleServiceInitializeOptions>,
|
||||
"options" | "logger"
|
||||
> = {} as any
|
||||
/**
|
||||
* The set of commands that are unsafe to execute automatically when
|
||||
* performing "alter table"
|
||||
*/
|
||||
#unsafeSQLCommands = ["alter column", "drop column"]
|
||||
|
||||
/**
|
||||
* On-the-fly computed set of entities for the user provided joinerConfig and the link it is coming from
|
||||
*/
|
||||
#linksEntities: {
|
||||
linkDescriptor: PlannerActionLinkDescriptor
|
||||
entity: EntitySchema
|
||||
}[]
|
||||
|
||||
/**
|
||||
* The table that keeps a track of tables generated by the link
|
||||
* module.
|
||||
*/
|
||||
protected tableName = "link_module_migrations"
|
||||
|
||||
constructor(
|
||||
joinerConfig: ModuleJoinerConfig[],
|
||||
options?: ModuleServiceInitializeOptions
|
||||
) {
|
||||
logger ??= console as unknown as Logger
|
||||
this.#dbConfig = ModulesSdkUtils.loadDatabaseConfig("link_modules", options)
|
||||
this.#linksEntities = joinerConfig
|
||||
.map((config) => {
|
||||
if (config.isReadOnlyLink) {
|
||||
return
|
||||
}
|
||||
|
||||
const dbData = ModulesSdkUtils.loadDatabaseConfig("link_modules", options)
|
||||
const entity = generateEntity(joinerConfig, primary, foreign)
|
||||
const pathToMigrations = __dirname + "/../migrations"
|
||||
const [primary, foreign] = config.relationships ?? []
|
||||
const linkDescriptor: PlannerActionLinkDescriptor = {
|
||||
fromModule: primary.serviceName,
|
||||
toModule: foreign.serviceName,
|
||||
fromModel: primary.alias,
|
||||
toModel: foreign.alias,
|
||||
}
|
||||
|
||||
const orm = await DALUtils.mikroOrmCreateConnection(
|
||||
dbData,
|
||||
[entity],
|
||||
pathToMigrations
|
||||
)
|
||||
return {
|
||||
entity: generateEntity(config, primary, foreign),
|
||||
linkDescriptor,
|
||||
}
|
||||
})
|
||||
.filter((item) => !!item)
|
||||
}
|
||||
|
||||
const tableName = entity.meta.collection
|
||||
/**
|
||||
* Initializes the ORM using the normalized dbConfig and set
|
||||
* of provided entities
|
||||
*/
|
||||
protected async createORM(entities: EntitySchema[] = []) {
|
||||
return await DALUtils.mikroOrmCreateConnection(this.#dbConfig, entities, "")
|
||||
}
|
||||
|
||||
let hasTable = false
|
||||
try {
|
||||
/**
|
||||
* Ensure the table to track link modules migrations
|
||||
* exists.
|
||||
*
|
||||
* @param orm MikroORM
|
||||
*/
|
||||
protected async ensureMigrationsTable(
|
||||
orm: MikroORM<PostgreSqlDriver>
|
||||
): Promise<void> {
|
||||
await orm.em.getDriver().getConnection().execute(`
|
||||
CREATE TABLE IF NOT EXISTS "${this.tableName}" (
|
||||
id SERIAL PRIMARY KEY,
|
||||
table_name VARCHAR(255) NOT NULL UNIQUE,
|
||||
link_descriptor JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
`)
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the migrations table is in sync
|
||||
*
|
||||
* @param orm
|
||||
* @protected
|
||||
*/
|
||||
protected async ensureMigrationsTableUpToDate(
|
||||
orm: MikroORM<PostgreSqlDriver>
|
||||
) {
|
||||
const existingTables: string[] = (
|
||||
await orm.em
|
||||
.getDriver()
|
||||
.getConnection()
|
||||
.execute(`SELECT 1 FROM "${tableName}" LIMIT 0`)
|
||||
hasTable = true
|
||||
} catch {}
|
||||
.execute<
|
||||
{
|
||||
table_name: string
|
||||
}[]
|
||||
>(
|
||||
`
|
||||
SELECT table_name
|
||||
FROM information_schema.tables;
|
||||
`
|
||||
)
|
||||
)
|
||||
.map(({ table_name }) => table_name)
|
||||
.filter((tableName) =>
|
||||
this.#linksEntities.some(
|
||||
({ entity }) => entity.meta.collection === tableName
|
||||
)
|
||||
)
|
||||
|
||||
if (!existingTables.length) {
|
||||
return
|
||||
}
|
||||
|
||||
const orderedDescriptors = existingTables.map((tableName) => {
|
||||
return this.#linksEntities.find(
|
||||
({ entity }) => entity.meta.collection === tableName
|
||||
)!.linkDescriptor
|
||||
})
|
||||
|
||||
const positionalArgs = new Array(existingTables.length)
|
||||
.fill("(?, ?)")
|
||||
.join(", ")
|
||||
await orm.em
|
||||
.getDriver()
|
||||
.getConnection()
|
||||
.execute(
|
||||
`
|
||||
INSERT INTO ${this.tableName} (table_name, link_descriptor) VALUES ${positionalArgs} ON CONFLICT DO NOTHING;
|
||||
`,
|
||||
existingTables.flatMap((tableName, index) => [
|
||||
tableName,
|
||||
JSON.stringify(orderedDescriptors[index]),
|
||||
])
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert tuple to the migrations table and create the link table
|
||||
*
|
||||
* @param orm
|
||||
* @param action
|
||||
* @protected
|
||||
*/
|
||||
protected async createLinkTable(
|
||||
orm: MikroORM<PostgreSqlDriver>,
|
||||
action: LinkMigrationsPlannerAction & {
|
||||
linkDescriptor: PlannerActionLinkDescriptor
|
||||
sql: string
|
||||
}
|
||||
) {
|
||||
const { tableName, linkDescriptor, sql } = action
|
||||
|
||||
await orm.em
|
||||
.getDriver()
|
||||
.getConnection()
|
||||
.execute(
|
||||
`
|
||||
INSERT INTO "${this.tableName}" (table_name, link_descriptor) VALUES (?, ?);
|
||||
${sql}
|
||||
`,
|
||||
[tableName, linkDescriptor]
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Drops the link table and untracks it from the "link_modules_migrations"
|
||||
* table.
|
||||
*/
|
||||
protected async dropLinkTable(
|
||||
orm: MikroORM<PostgreSqlDriver>,
|
||||
tableName: string
|
||||
) {
|
||||
await orm.em.getDriver().getConnection().execute(`
|
||||
DROP TABLE IF EXISTS "${tableName}";
|
||||
DELETE FROM "${this.tableName}" WHERE table_name = '${tableName}';
|
||||
`)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an array of table names that have been tracked during
|
||||
* the last run. In short, these tables were created by the
|
||||
* link modules migrations runner.
|
||||
*
|
||||
* @param orm MikroORM
|
||||
*/
|
||||
protected async getTrackedLinksTables(
|
||||
orm: MikroORM<PostgreSqlDriver>
|
||||
): Promise<
|
||||
{ table_name: string; link_descriptor: PlannerActionLinkDescriptor }[]
|
||||
> {
|
||||
const results = await orm.em.getDriver().getConnection().execute<
|
||||
{
|
||||
table_name: string
|
||||
link_descriptor: PlannerActionLinkDescriptor
|
||||
}[]
|
||||
>(`
|
||||
SELECT table_name, link_descriptor from "${this.tableName}"
|
||||
`)
|
||||
|
||||
return results.map((tuple) => ({
|
||||
table_name: tuple.table_name,
|
||||
link_descriptor: tuple.link_descriptor,
|
||||
}))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the migration plan for a specific link entity.
|
||||
*/
|
||||
protected async getEntityMigrationPlan(
|
||||
linkDescriptor: PlannerActionLinkDescriptor,
|
||||
entity: EntitySchema,
|
||||
trackedLinksTables: string[]
|
||||
): Promise<LinkMigrationsPlannerAction> {
|
||||
const tableName = entity.meta.collection
|
||||
const orm = await this.createORM([entity])
|
||||
|
||||
const generator = orm.getSchemaGenerator()
|
||||
if (hasTable) {
|
||||
/* const updateSql = await generator.getUpdateSchemaSQL()
|
||||
const entityUpdates = updateSql
|
||||
.split(";")
|
||||
.map((sql) => sql.trim())
|
||||
.filter((sql) =>
|
||||
sql.toLowerCase().includes(`alter table "${tableName.toLowerCase()}"`)
|
||||
)
|
||||
const platform = orm.em.getPlatform()
|
||||
const connection = orm.em.getConnection()
|
||||
const schemaName = this.#dbConfig.schema || "public"
|
||||
|
||||
if (entityUpdates.length > 0) {
|
||||
try {
|
||||
await generator.execute(entityUpdates.join(";"))
|
||||
logger.info(`Link module "${serviceName}" migration executed`)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Link module "${serviceName}" migration failed to run - Error: ${error.errros ?? error}`
|
||||
)
|
||||
}
|
||||
} else {
|
||||
logger.info(`Skipping "${tableName}" migration.`)
|
||||
}*/
|
||||
// Note: Temporarily skipping this for handling no logs on the CI. Bring this back if necessary.
|
||||
// logger.info(
|
||||
// `Link module('${serviceName}'): Table already exists. Write your own migration if needed.`
|
||||
// )
|
||||
} else {
|
||||
try {
|
||||
await generator.createSchema()
|
||||
|
||||
logger.info(`Link module('${serviceName}'): Migration executed`)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Link module('${serviceName}'): Migration failed - Error: ${
|
||||
error.errros ?? error
|
||||
}`
|
||||
)
|
||||
/**
|
||||
* If the table name for the entity has not been
|
||||
* managed by us earlier, then we should create
|
||||
* it.
|
||||
*/
|
||||
if (!trackedLinksTables.includes(tableName)) {
|
||||
return {
|
||||
action: "create",
|
||||
linkDescriptor,
|
||||
tableName,
|
||||
sql: await generator.getCreateSchemaSQL(),
|
||||
}
|
||||
}
|
||||
|
||||
await orm.close()
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Pre-fetching information schema from the database and using that
|
||||
* as the way to compute the update diff.
|
||||
*
|
||||
* @note
|
||||
* The "loadInformationSchema" mutates the "dbSchema" argument provided
|
||||
* to it as the first argument.
|
||||
*/
|
||||
const dbSchema = new DatabaseSchema(platform, schemaName)
|
||||
await platform
|
||||
.getSchemaHelper?.()
|
||||
?.loadInformationSchema(dbSchema, connection, [
|
||||
{
|
||||
table_name: tableName,
|
||||
schema_name: schemaName,
|
||||
},
|
||||
])
|
||||
|
||||
export function getRevertMigration(
|
||||
joinerConfig: ModuleJoinerConfig,
|
||||
serviceName: string,
|
||||
primary: JoinerRelationship,
|
||||
foreign: JoinerRelationship
|
||||
) {
|
||||
return async function revertMigrations(
|
||||
{
|
||||
options,
|
||||
logger,
|
||||
}: Pick<
|
||||
LoaderOptions<ModuleServiceInitializeOptions>,
|
||||
"options" | "logger"
|
||||
> = {} as any
|
||||
) {
|
||||
logger ??= console as unknown as Logger
|
||||
const updateSQL = await generator.getUpdateSchemaSQL({
|
||||
fromSchema: dbSchema,
|
||||
})
|
||||
|
||||
const dbData = ModulesSdkUtils.loadDatabaseConfig("link_modules", options)
|
||||
const entity = generateEntity(joinerConfig, primary, foreign)
|
||||
const pathToMigrations = __dirname + "/../migrations"
|
||||
/**
|
||||
* Entity is upto-date and hence we do not have to perform
|
||||
* any updates on it.
|
||||
*/
|
||||
if (!updateSQL.length) {
|
||||
return {
|
||||
action: "noop",
|
||||
linkDescriptor,
|
||||
tableName,
|
||||
}
|
||||
}
|
||||
|
||||
const orm = await DALUtils.mikroOrmCreateConnection(
|
||||
dbData,
|
||||
[entity],
|
||||
pathToMigrations
|
||||
)
|
||||
const usesUnsafeCommands = this.#unsafeSQLCommands.some((fragment) => {
|
||||
return updateSQL.match(new RegExp(`${fragment}`, "ig"))
|
||||
})
|
||||
|
||||
try {
|
||||
const migrator = orm.getMigrator()
|
||||
await migrator.down()
|
||||
logger.info(`Link module "${serviceName}" migration executed`)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Link module "${serviceName}" migration failed to run - Error: ${
|
||||
error.errros ?? error
|
||||
}`
|
||||
return {
|
||||
action: usesUnsafeCommands ? "notify" : "update",
|
||||
linkDescriptor,
|
||||
tableName,
|
||||
sql: updateSQL,
|
||||
}
|
||||
} finally {
|
||||
await orm.close(true)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a plan to executed in order to keep the database state in
|
||||
* sync with the user-defined links.
|
||||
*
|
||||
* This method only creates a plan and does not change the database
|
||||
* state. You must call the "executePlan" method for that.
|
||||
*/
|
||||
async createPlan() {
|
||||
const orm = await this.createORM()
|
||||
await this.ensureMigrationsTable(orm)
|
||||
|
||||
const executionActions: LinkMigrationsPlannerAction[] = []
|
||||
|
||||
await this.ensureMigrationsTableUpToDate(orm)
|
||||
|
||||
const trackedTables = await this.getTrackedLinksTables(orm)
|
||||
const trackedTablesNames = trackedTables.map(({ table_name }) => table_name)
|
||||
|
||||
/**
|
||||
* Looping through the new set of entities and generating
|
||||
* execution plan for them
|
||||
*/
|
||||
for (let { entity, linkDescriptor } of this.#linksEntities) {
|
||||
executionActions.push(
|
||||
await this.getEntityMigrationPlan(
|
||||
linkDescriptor,
|
||||
entity,
|
||||
trackedTablesNames
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
await orm.close()
|
||||
const linksTableNames = this.#linksEntities.map(
|
||||
({ entity }) => entity.meta.collection
|
||||
)
|
||||
|
||||
/**
|
||||
* Finding the tables to be removed
|
||||
*/
|
||||
const tablesToRemove = arrayDifference(trackedTablesNames, linksTableNames)
|
||||
tablesToRemove.forEach((tableToRemove) => {
|
||||
executionActions.push({
|
||||
action: "delete",
|
||||
tableName: tableToRemove,
|
||||
linkDescriptor: trackedTables.find(
|
||||
({ table_name }) => tableToRemove === table_name
|
||||
)!.link_descriptor,
|
||||
})
|
||||
})
|
||||
|
||||
try {
|
||||
return executionActions
|
||||
} finally {
|
||||
await orm.close(true)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the actionsPlan actions where the action is one of 'create' | 'update' | 'delete'.
|
||||
* 'noop' and 'notify' actions are implicitly ignored. If a notify action needs to be
|
||||
* executed, you can mutate its action to 'update', in that scenario it means that an unsafe
|
||||
* update sql (from our point of view) will be executed and some data could be lost.
|
||||
*
|
||||
* @param actionPlan
|
||||
*/
|
||||
async executePlan(actionPlan: LinkMigrationsPlannerAction[]): Promise<void> {
|
||||
const orm = await this.createORM()
|
||||
|
||||
await promiseAll(
|
||||
actionPlan.map(async (action) => {
|
||||
switch (action.action) {
|
||||
case "delete":
|
||||
return await this.dropLinkTable(orm, action.tableName)
|
||||
case "create":
|
||||
return await this.createLinkTable(orm, action)
|
||||
case "update":
|
||||
return await orm.em.getDriver().getConnection().execute(action.sql)
|
||||
default:
|
||||
return
|
||||
}
|
||||
})
|
||||
).finally(() => orm.close(true))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user