feat(core, medusa, cli): Enable migration scripts (#10960)

* feat(core, medusa): Enable migration scripts

* spacing

* rm unnecessary import

* Allow to skip script migration

* fix missing options

* add options

* add tests and small changes

* update

* add checks

* add lock mechanism to be extra safe

* Create six-bears-vanish.md

* update queries

* fix tests

---------

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-01-15 18:51:37 +01:00
committed by GitHub
parent 62d543e691
commit 0924164e86
11 changed files with 589 additions and 1 deletions

View File

@@ -31,7 +31,8 @@
"./orchestration": "./dist/orchestration/index.js",
"./workflows-sdk": "./dist/workflows-sdk/index.js",
"./workflows-sdk/composer": "./dist/workflows-sdk/composer.js",
"./modules-sdk": "./dist/modules-sdk/index.js"
"./modules-sdk": "./dist/modules-sdk/index.js",
"./migrations": "./dist/migrations/index.js"
},
"engines": {
"node": ">=20"

View File

@@ -11,6 +11,7 @@ export * from "./subscribers"
export * from "./workflows"
export * from "./telemetry"
export * from "./zod"
export * from "./migrations"
export const MEDUSA_CLI_PATH = require.resolve("@medusajs/cli")

View File

@@ -0,0 +1,172 @@
import { MedusaContainer } from "@medusajs/types"
import { MigrationScriptsMigrator } from "../run-migration-scripts"
import { jest } from "@jest/globals"
import path from "path"
import { ContainerRegistrationKeys, Modules } from "@medusajs/utils"
const mockPgConnection = {
raw: jest.fn(),
}
const mockLockService = {
acquire: jest.fn(),
release: jest.fn(),
}
const mockContainer = {
resolve: (key: string) => {
if (key === ContainerRegistrationKeys.PG_CONNECTION) {
return mockPgConnection
}
if (key === Modules.LOCKING) {
return mockLockService
}
throw new Error(`Unknown key: ${key}`)
},
} as unknown as MedusaContainer
describe("MigrationScriptsMigrator", () => {
let migrator: MigrationScriptsMigrator
beforeEach(() => {
jest.clearAllMocks()
migrator = new MigrationScriptsMigrator({ container: mockContainer })
// @ts-ignore
migrator.pgConnection = mockPgConnection
})
describe("run", () => {
it("should successfully run migration scripts", async () => {
const mockScript = jest.fn()
const scriptPath = "/path/to/migration.ts"
jest
.spyOn(migrator as any, "getPendingMigrations")
.mockResolvedValue([scriptPath])
jest
.spyOn(migrator as any, "insertMigration")
.mockResolvedValue(undefined)
jest
.spyOn(migrator as any, "trackDuration")
.mockReturnValue({ getSeconds: () => 1 })
jest.mock(
scriptPath,
() => ({
default: mockScript,
}),
{ virtual: true }
)
await migrator.run([scriptPath])
expect(mockScript).toHaveBeenCalled()
expect(mockPgConnection.raw).toHaveBeenCalledWith(
expect.stringContaining("UPDATE script_migrations"),
[path.basename(scriptPath)]
)
expect(migrator["insertMigration"]).toHaveBeenCalledWith([
{ script_name: `'${path.basename(scriptPath)}'` },
])
})
it("should handle failed migrations by cleaning up", async () => {
const scriptPath = "/path/to/failing-migration.ts"
const error = new Error("Migration failed")
jest
.spyOn(migrator as any, "getPendingMigrations")
.mockResolvedValue([scriptPath])
jest
.spyOn(migrator as any, "insertMigration")
.mockResolvedValue(undefined)
jest
.spyOn(migrator as any, "trackDuration")
.mockReturnValue({ getSeconds: () => 1 })
const mockFailingScript = jest.fn().mockRejectedValue(error as never)
jest.mock(
scriptPath,
() => ({
default: mockFailingScript,
}),
{ virtual: true }
)
await expect(migrator.run([scriptPath])).rejects.toThrow(
"Migration failed"
)
expect(mockPgConnection.raw).toHaveBeenCalledWith(
expect.stringContaining("DELETE FROM script_migrations"),
[path.basename(scriptPath)]
)
})
it("should skip migration when unique constraint error occurs", async () => {
const scriptPath = "/path/to/migration.ts"
const uniqueError = new Error("Unique constraint violation")
;(uniqueError as any).constraint = "idx_script_name_unique"
jest
.spyOn(migrator as any, "getPendingMigrations")
.mockResolvedValue([scriptPath])
jest
.spyOn(migrator as any, "insertMigration")
.mockRejectedValue(uniqueError)
jest
.spyOn(migrator as any, "trackDuration")
.mockReturnValue({ getSeconds: () => 1 })
const mockScript = jest.fn()
jest.mock(
scriptPath,
() => ({
default: mockScript,
}),
{ virtual: true }
)
await migrator.run([scriptPath])
expect(mockScript).not.toHaveBeenCalled()
expect(mockPgConnection.raw).not.toHaveBeenCalledWith(
expect.stringContaining("UPDATE script_migrations")
)
})
})
describe("getPendingMigrations", () => {
it("should return only non-executed migrations", async () => {
const executedMigration = "executed.ts"
const pendingMigration = "pending.ts"
jest
.spyOn(migrator as any, "getExecutedMigrations")
.mockResolvedValue([{ script_name: executedMigration }])
jest
.spyOn(migrator as any, "loadMigrationFiles")
.mockResolvedValue([
`/path/to/${executedMigration}`,
`/path/to/${pendingMigration}`,
])
const result = await migrator.getPendingMigrations(["/path/to"])
expect(result).toHaveLength(1)
expect(result[0]).toContain(pendingMigration)
})
})
describe("createMigrationTable", () => {
it("should create migration table if it doesn't exist", async () => {
await (migrator as any).createMigrationTable()
expect(mockPgConnection.raw).toHaveBeenCalledWith(
expect.stringContaining("CREATE TABLE IF NOT EXISTS script_migrations")
)
})
})
})

View File

@@ -0,0 +1,2 @@
export * from "./migrator"
export * from "./run-migration-scripts"

View File

@@ -0,0 +1,158 @@
import { join } from "path"
import { glob } from "glob"
import { logger } from "../logger"
import { MedusaContainer } from "@medusajs/types"
import { ContainerRegistrationKeys } from "../utils"
import { Knex } from "@mikro-orm/knex"
export abstract class Migrator {
protected abstract migration_table_name: string
protected container: MedusaContainer
protected pgConnection: Knex<any>
#alreadyLoadedPaths: Map<string, any> = new Map()
constructor({ container }: { container: MedusaContainer }) {
this.container = container
this.pgConnection = this.container.resolve(
ContainerRegistrationKeys.PG_CONNECTION
)
}
/**
* Util to track duration using hrtime
*/
protected trackDuration() {
const startTime = process.hrtime()
return {
getSeconds() {
const duration = process.hrtime(startTime)
return (duration[0] + duration[1] / 1e9).toFixed(2)
},
}
}
async ensureDatabase(): Promise<void> {
const pgConnection = this.container.resolve(
ContainerRegistrationKeys.PG_CONNECTION
)
try {
await pgConnection.raw("SELECT 1 + 1;")
} catch (error) {
if (error.code === "3D000") {
logger.error(
`Cannot run migrations. ${error.message.replace("error: ", "")}`
)
logger.info(`Run command "db:create" to create the database`)
} else {
logger.error(error)
}
throw error
}
}
async ensureMigrationsTable(): Promise<void> {
try {
// Check if table exists
const tableExists = await this.pgConnection.raw(`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '${this.migration_table_name}'
);
`)
if (!tableExists.rows[0].exists) {
logger.info(
`Creating migrations table '${this.migration_table_name}'...`
)
await this.createMigrationTable()
logger.info("Migrations table created successfully")
}
} catch (error) {
logger.error("Failed to ensure migrations table exists:", error)
throw error
}
}
async getExecutedMigrations(): Promise<{ script_name: string }[]> {
try {
const result = await this.pgConnection.raw(
`SELECT * FROM ${this.migration_table_name}`
)
return result.rows
} catch (error) {
logger.error("Failed to get executed migrations:", error)
throw error
}
}
async insertMigration(records: Record<string, any>[]): Promise<void> {
try {
const values = records.map((record) => Object.values(record))
const columns = Object.keys(records[0])
await this.pgConnection.raw(
`INSERT INTO ${this.migration_table_name} (${columns.join(
", "
)}) VALUES (${new Array(values.length).fill("?").join(",")})`,
values
)
} catch (error) {
logger.error(
`Failed to update migration table '${this.migration_table_name}':`,
error
)
throw error
}
}
/**
* Load migration files from the given paths
*
* @param paths - The paths to load migration files from
* @param options - The options for loading migration files
* @param options.force - Whether to force loading migration files even if they have already been loaded
* @returns The loaded migration file paths
*/
async loadMigrationFiles(
paths: string[],
{ force }: { force?: boolean } = { force: false }
): Promise<string[]> {
const allScripts: string[] = []
for (const basePath of paths) {
if (!force && this.#alreadyLoadedPaths.has(basePath)) {
allScripts.push(...this.#alreadyLoadedPaths.get(basePath))
continue
}
try {
const scriptFiles = glob.sync("*.{js,ts}", {
cwd: basePath,
ignore: ["**/index.{js,ts}"],
})
if (!scriptFiles?.length) {
continue
}
const filePaths = scriptFiles.map((script) => join(basePath, script))
this.#alreadyLoadedPaths.set(basePath, filePaths)
allScripts.push(...filePaths)
} catch (error) {
logger.error(`Failed to load migration files from ${basePath}:`, error)
throw error
}
}
return allScripts
}
protected abstract createMigrationTable(): Promise<void>
abstract run(...args: any[]): Promise<any>
abstract getPendingMigrations(migrationPaths: string[]): Promise<string[]>
}

View File

@@ -0,0 +1,113 @@
import { MedusaContainer } from "@medusajs/types"
import { dynamicImport, Modules } from "@medusajs/utils"
import { basename } from "path"
import { logger } from "../logger"
import { Migrator } from "./migrator"
export class MigrationScriptsMigrator extends Migrator {
protected migration_table_name = "script_migrations"
constructor({ container }: { container: MedusaContainer }) {
super({ container })
}
/**
* Run the migration scripts
* @param paths - The paths from which to load the scripts
*/
async run(paths: string[]): Promise<void> {
const lockService = this.container.resolve(Modules.LOCKING)
const lockKey = "migration-scripts-running"
await lockService.acquire(lockKey, {
expire: 60 * 60,
})
try {
const scriptPaths = await this.getPendingMigrations(paths)
for (const script of scriptPaths) {
const scriptFn = await dynamicImport(script)
if (!scriptFn.default) {
throw new Error(
`Failed to load migration script ${script}. No default export found.`
)
}
const scriptName = basename(script)
const err = await this.insertMigration([
{ script_name: `'${scriptName}'` },
]).catch((e) => e)
/**
* In case another processes is running in parallel, the migration might
* have already been executed and therefore the insert will fail because of the
* unique constraint.
*/
if (err) {
if (err.constraint === "idx_script_name_unique") {
continue
}
throw err
}
logger.info(`Running migration script ${script}`)
try {
const tracker = this.trackDuration()
await scriptFn.default({ container: this.container })
logger.info(
`Migration script ${script} completed (${tracker.getSeconds()}s)`
)
await this.#updateMigrationFinishedAt(scriptName)
} catch (error) {
logger.error(`Failed to run migration script ${script}:`, error)
await this.#deleteMigration(scriptName)
throw error
}
}
} finally {
await lockService.release(lockKey)
}
}
async getPendingMigrations(migrationPaths: string[]): Promise<string[]> {
const executedMigrations = new Set(
(await this.getExecutedMigrations()).map((item) => item.script_name)
)
const all = await this.loadMigrationFiles(migrationPaths)
return all.filter((item) => !executedMigrations.has(basename(item)))
}
protected async createMigrationTable(): Promise<void> {
await this.pgConnection.raw(`
CREATE TABLE IF NOT EXISTS ${this.migration_table_name} (
id SERIAL PRIMARY KEY,
script_name VARCHAR(255) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
finished_at TIMESTAMP WITH TIME ZONE
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_script_name_unique ON ${this.migration_table_name} (script_name);
`)
}
#updateMigrationFinishedAt(scriptName: string) {
return this.pgConnection.raw(
`UPDATE ${this.migration_table_name} SET finished_at = CURRENT_TIMESTAMP WHERE script_name = ?`,
[scriptName]
)
}
#deleteMigration(scriptName: string) {
return this.pgConnection.raw(
`DELETE FROM ${this.migration_table_name} WHERE script_name = ?`,
[scriptName]
)
}
}