fix: Resume workflow execution (#7103)

Supercedes #7051 – if OK, I'll move the base of this PR to `develop` and we can run reviews only of this one.

**What**
- Gracefully close BullMQ Worker in Redis Event Bus and Redis Workflow Engine (by @edast, @sradevski)
- Register workflows before MedusaApp is loaded*
- Introduce `onApplicationPrepareShutdown`**
- Refactor plugin resolving for reusability purposes

*We now register workflows before modules are loaded to ensure modules can run workflows as part of bootstrapping. E.g. the Redis Workflow Engine resumes workflows when it starts, which has until this change failed, because the workflows were not registered yet.

**We introduce a new hook to prepare resources for an application shutdown. E.g. closing the BullMQ worker as a preparatory step to closing the BullMQ queue. The worker will continue to process jobs while the queue is still open to receive new jobs (without processing them). 

Co-authored-by: Stevche Radevski <4820812+sradevski@users.noreply.github.com>
Co-authored-by: Darius <618221+edast@users.noreply.github.com>
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Oli Juhl
2024-04-22 14:32:50 +02:00
committed by GitHub
parent 67a21c3e45
commit 0eb68541b8
24 changed files with 351 additions and 228 deletions

View File

@@ -1,3 +1,7 @@
import {
createShippingOptionsWorkflow,
updateShippingOptionsWorkflow,
} from "@medusajs/core-flows"
import { ModuleRegistrationName } from "@medusajs/modules-sdk"
import {
FulfillmentSetDTO,
@@ -8,16 +12,12 @@ import {
ShippingProfileDTO,
UpdateShippingOptionsWorkflowInput,
} from "@medusajs/types"
import { medusaIntegrationTestRunner } from "medusa-test-utils/dist"
import {
createShippingOptionsWorkflow,
updateShippingOptionsWorkflow,
} from "@medusajs/core-flows"
import {
ContainerRegistrationKeys,
remoteQueryObjectFromString,
RuleOperator,
remoteQueryObjectFromString,
} from "@medusajs/utils"
import { medusaIntegrationTestRunner } from "medusa-test-utils/dist"
jest.setTimeout(100000)

View File

@@ -62,10 +62,13 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
__hooks = {
onApplicationShutdown: async () => {
await this.bullWorker_?.close(true)
await this.queue_.close()
// eslint-disable-next-line max-len
this.eventBusRedisConnection_.disconnect()
},
onApplicationPrepareShutdown: async () => {
await this.bullWorker_?.close()
},
}
/**

View File

@@ -54,7 +54,7 @@ export default async function ({ port, cpus, directory }) {
const app = express()
const { dbConnection, shutdown } = await loaders({
const { dbConnection, shutdown, prepareShutdown } = await loaders({
directory,
expressApp: app,
})
@@ -72,10 +72,12 @@ export default async function ({ port, cpus, directory }) {
const gracefulShutDown = () => {
server
.shutdown()
.then(() => {
shutdown().then(() => {
process.exit(0)
})
.then(async () => {
return await prepareShutdown()
})
.then(async () => {
await shutdown()
process.exit(0)
})
.catch((e) => {
process.exit(1)

View File

@@ -19,7 +19,7 @@ export default async function ({ port, directory }) {
const app = express()
try {
const { dbConnection, shutdown } = await loaders({
const { dbConnection, shutdown, prepareShutdown } = await loaders({
directory,
expressApp: app,
})
@@ -37,13 +37,15 @@ export default async function ({ port, directory }) {
// Handle graceful shutdown
const gracefulShutDown = () => {
Logger.info("Gracefully shutting down server")
server
.shutdown()
.then(() => {
Logger.info("Gracefully stopping the server.")
shutdown().then(() => {
process.exit(0)
})
.then(async () => {
return await prepareShutdown()
})
.then(async () => {
await shutdown()
process.exit(0)
})
.catch((e) => {
Logger.error("Error received when shutting down the server.", e)

View File

@@ -0,0 +1,21 @@
import { PluginDetails } from "@medusajs/types"
import { glob } from "glob"
import { getResolvedPlugins } from "./resolve-plugins"
/**
* import files from the workflows directory to run the registration of the wofklows
* @param pluginDetails
*/
async function registerWorkflows(pluginDetails: PluginDetails): Promise<void> {
const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {})
await Promise.all(files.map(async (file) => import(file)))
}
export async function registerProjectWorkflows({
rootDirectory,
configModule,
}) {
const [resolved] =
getResolvedPlugins(rootDirectory, configModule, "dist", true) || []
await registerWorkflows(resolved)
}

View File

@@ -0,0 +1,151 @@
import { ConfigModule, PluginDetails } from "@medusajs/types"
import { isString } from "@medusajs/utils"
import fs from "fs"
import { sync as existsSync } from "fs-exists-cached"
import { createRequireFromPath } from "medusa-core-utils"
import path from "path"
import { MEDUSA_PROJECT_NAME } from "../plugins"
function createPluginId(name: string): string {
return name
}
function createFileContentHash(path, files): string {
return path + files
}
/**
* Finds the correct path for the plugin. If it is a local plugin it will be
* found in the plugins folder. Otherwise we will look for the plugin in the
* installed npm packages.
* @param {string} pluginName - the name of the plugin to find. Should match
* the name of the folder where the plugin is contained.
* @return {object} the plugin details
*/
function resolvePlugin(pluginName: string): {
resolve: string
id: string
name: string
options: Record<string, unknown>
version: string
} {
// Only find plugins when we're not given an absolute path
if (!existsSync(pluginName)) {
// Find the plugin in the local plugins folder
const resolvedPath = path.resolve(`./plugins/${pluginName}`)
if (existsSync(resolvedPath)) {
if (existsSync(`${resolvedPath}/package.json`)) {
const packageJSON = JSON.parse(
fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`)
)
const name = packageJSON.name || pluginName
// warnOnIncompatiblePeerDependency(name, packageJSON)
return {
resolve: resolvedPath,
name,
id: createPluginId(name),
options: {},
version:
packageJSON.version || createFileContentHash(resolvedPath, `**`),
}
} else {
// Make package.json a requirement for local plugins too
throw new Error(`Plugin ${pluginName} requires a package.json file`)
}
}
}
const rootDir = path.resolve(".")
/**
* Here we have an absolute path to an internal plugin, or a name of a module
* which should be located in node_modules.
*/
try {
const requireSource =
rootDir !== null
? createRequireFromPath(`${rootDir}/:internal:`)
: require
// If the path is absolute, resolve the directory of the internal plugin,
// otherwise resolve the directory containing the package.json
const resolvedPath = path.dirname(
requireSource.resolve(`${pluginName}/package.json`)
)
const packageJSON = JSON.parse(
fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`)
)
// warnOnIncompatiblePeerDependency(packageJSON.name, packageJSON)
const computedResolvedPath =
resolvedPath + (process.env.DEV_MODE ? "/src" : "")
// Add support for a plugin to output the build into a dist directory
const resolvedPathToDist = resolvedPath + "/dist"
const isDistExist =
resolvedPathToDist &&
!process.env.DEV_MODE &&
existsSync(resolvedPath + "/dist")
return {
resolve: isDistExist ? resolvedPathToDist : computedResolvedPath,
id: createPluginId(packageJSON.name),
name: packageJSON.name,
options: {},
version: packageJSON.version,
}
} catch (err) {
throw new Error(
`Unable to find plugin "${pluginName}". Perhaps you need to install its package?`
)
}
}
export function getResolvedPlugins(
rootDirectory: string,
configModule: ConfigModule,
extensionDirectoryPath = "dist",
isMedusaProject = false
): undefined | PluginDetails[] {
const { plugins } = configModule
if (isMedusaProject) {
const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath)
return [
{
resolve: extensionDirectory,
name: MEDUSA_PROJECT_NAME,
id: createPluginId(MEDUSA_PROJECT_NAME),
options: configModule,
version: createFileContentHash(process.cwd(), `**`),
},
]
}
const resolved = plugins.map((plugin) => {
if (isString(plugin)) {
return resolvePlugin(plugin)
}
const details = resolvePlugin(plugin.resolve)
details.options = plugin.options
return details
})
const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath)
// Resolve user's project as a plugin for loading purposes
resolved.push({
resolve: extensionDirectory,
name: MEDUSA_PROJECT_NAME,
id: createPluginId(MEDUSA_PROJECT_NAME),
options: configModule,
version: createFileContentHash(process.cwd(), `**`),
})
return resolved
}

View File

@@ -6,8 +6,9 @@ import {
import { ConfigModule, MODULE_RESOURCE_TYPE } from "@medusajs/types"
import {
ContainerRegistrationKeys,
MedusaV2Flag,
isString,
MedusaV2Flag, promiseAll,
promiseAll,
} from "@medusajs/utils"
import { asValue } from "awilix"
import { Express, NextFunction, Request, Response } from "express"
@@ -24,6 +25,7 @@ import databaseLoader, { dataSource } from "./database"
import defaultsLoader from "./defaults"
import expressLoader from "./express"
import featureFlagsLoader from "./feature-flags"
import { registerProjectWorkflows } from "./helpers/register-workflows"
import medusaProjectApisLoader from "./load-medusa-project-apis"
import Logger from "./logger"
import loadMedusaApp, { mergeDefaultModules } from "./medusa-app"
@@ -102,7 +104,14 @@ async function loadMedusaV2({
[ContainerRegistrationKeys.REMOTE_QUERY]: asValue(null),
})
const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({
// Workflows are registered before the app to allow modules to run workflows as part of bootstrapping
// e.g. the workflow engine will resume workflows that were running when the server was shut down
await registerProjectWorkflows({ rootDirectory, configModule })
const {
onApplicationShutdown: medusaAppOnApplicationShutdown,
onApplicationPrepareShutdown: medusaAppOnApplicationPrepareShutdown,
} = await loadMedusaApp({
configModule,
container,
})
@@ -149,11 +158,13 @@ async function loadMedusaV2({
await createDefaultsWorkflow(container).run()
const shutdown = async () => {
await medusaAppOnApplicationShutdown()
await promiseAll([
container.dispose(),
pgConnection?.context?.destroy(),
expressShutdown(),
medusaAppOnApplicationShutdown()
medusaAppOnApplicationShutdown(),
])
}
@@ -163,6 +174,7 @@ async function loadMedusaV2({
app: expressApp,
pgConnection,
shutdown,
prepareShutdown: medusaAppOnApplicationPrepareShutdown,
}
}
@@ -177,6 +189,7 @@ export default async ({
app: Express
pgConnection: unknown
shutdown: () => Promise<void>
prepareShutdown: () => Promise<void>
}> => {
const configModule = loadConfig(rootDirectory)
const featureFlagRouter = featureFlagsLoader(configModule, Logger)
@@ -211,7 +224,11 @@ export default async ({
featureFlagRouter: asValue(featureFlagRouter),
})
const { shutdown: redisShutdown } = await redisLoader({ container, configModule, logger: Logger })
const { shutdown: redisShutdown } = await redisLoader({
container,
configModule,
logger: Logger,
})
const modelsActivity = Logger.activity(`Initializing models${EOL}`)
track("MODELS_INIT_STARTED")
@@ -271,7 +288,10 @@ export default async ({
track("MODULES_INIT_STARTED")
// Move before services init once all modules are migrated and do not rely on core resources anymore
const { onApplicationShutdown: medusaAppOnApplicationShutdown } = await loadMedusaApp({
const {
onApplicationShutdown: medusaAppOnApplicationShutdown,
onApplicationPrepareShutdown: medusaAppOnApplicationPrepareShutdown,
} = await loadMedusaApp({
configModule,
container,
})
@@ -281,7 +301,10 @@ export default async ({
const expActivity = Logger.activity(`Initializing express${EOL}`)
track("EXPRESS_INIT_STARTED")
const { shutdown: expressShutdown } = await expressLoader({ app: expressApp, configModule })
const { shutdown: expressShutdown } = await expressLoader({
app: expressApp,
configModule,
})
await passportLoader({ app: expressApp, configModule })
const exAct = Logger.success(expActivity, "Express intialized") || {}
track("EXPRESS_INIT_COMPLETED", { duration: exAct.duration })
@@ -339,13 +362,14 @@ export default async ({
track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration })
async function shutdown() {
await medusaAppOnApplicationShutdown()
await promiseAll([
container.dispose(),
dbConnection?.destroy(),
pgConnection?.context?.destroy(),
redisShutdown(),
expressShutdown(),
medusaAppOnApplicationShutdown(),
])
}
@@ -356,5 +380,6 @@ export default async ({
app: expressApp,
pgConnection,
shutdown,
prepareShutdown: async () => medusaAppOnApplicationPrepareShutdown(),
}
}

View File

@@ -1,12 +1,12 @@
import { promiseAll } from "@medusajs/utils"
import { Express } from "express"
import glob from "glob"
import _ from "lodash"
import { trackInstallation } from "medusa-telemetry"
import { EOL } from "os"
import path from "path"
import { ConfigModule, Logger, MedusaContainer } from "../types/global"
import ScheduledJobsLoader from "./helpers/jobs"
import { getResolvedPlugins } from "./helpers/resolve-plugins"
import { RoutesLoader } from "./helpers/routing"
import { SubscriberLoader } from "./helpers/subscribers"
import logger from "./logger"
@@ -55,7 +55,6 @@ export default async ({
)
}
await registerSubscribers(pluginDetails, container, activityId)
await registerWorkflows(pluginDetails)
})
)
@@ -78,23 +77,6 @@ export default async ({
resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin"))
}
function getResolvedPlugins(
rootDirectory: string,
configModule: ConfigModule,
extensionDirectoryPath = "dist"
): undefined | PluginDetails[] {
const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath)
return [
{
resolve: extensionDirectory,
name: MEDUSA_PROJECT_NAME,
id: createPluginId(MEDUSA_PROJECT_NAME),
options: configModule,
version: createFileContentHash(process.cwd(), `**`),
},
]
}
async function runLoaders(
pluginDetails: PluginDetails,
container: MedusaContainer
@@ -191,21 +173,3 @@ async function registerSubscribers(
true
).load()
}
/**
* import files from the workflows directory to run the registration of the wofklows
* @param pluginDetails
*/
async function registerWorkflows(pluginDetails: PluginDetails): Promise<void> {
const files = glob.sync(`${pluginDetails.resolve}/workflows/*.js`, {})
await Promise.all(files.map(async (file) => import(file)))
}
// TODO: Create unique id for each plugin
function createPluginId(name: string): string {
return name
}
function createFileContentHash(path, files): string {
return path + files
}

View File

@@ -3,13 +3,9 @@ import {
promiseAll,
upperCaseFirst,
} from "@medusajs/utils"
import { aliasTo, asFunction, asValue, Lifetime } from "awilix"
import { Lifetime, aliasTo, asFunction, asValue } from "awilix"
import { Express } from "express"
import fs from "fs"
import { sync as existsSync } from "fs-exists-cached"
import glob from "glob"
import _ from "lodash"
import { createRequireFromPath } from "medusa-core-utils"
import { OauthService } from "medusa-interfaces"
import { trackInstallation } from "medusa-telemetry"
import { EOL } from "os"
@@ -41,6 +37,7 @@ import {
registerAbstractFulfillmentServiceFromClass,
registerPaymentProcessorFromClass,
} from "./helpers/plugins"
import { getResolvedPlugins } from "./helpers/resolve-plugins"
import { RoutesLoader } from "./helpers/routing"
import { SubscriberLoader } from "./helpers/subscribers"
import logger from "./logger"
@@ -121,37 +118,6 @@ export default async ({
resolved.forEach((plugin) => trackInstallation(plugin.name, "plugin"))
}
function getResolvedPlugins(
rootDirectory: string,
configModule: ConfigModule,
extensionDirectoryPath = "dist"
): undefined | PluginDetails[] {
const { plugins } = configModule
const resolved = plugins.map((plugin) => {
if (_.isString(plugin)) {
return resolvePlugin(plugin)
}
const details = resolvePlugin(plugin.resolve)
details.options = plugin.options
return details
})
const extensionDirectory = path.join(rootDirectory, extensionDirectoryPath)
// Resolve user's project as a plugin for loading purposes
resolved.push({
resolve: extensionDirectory,
name: MEDUSA_PROJECT_NAME,
id: createPluginId(MEDUSA_PROJECT_NAME),
options: configModule,
version: createFileContentHash(process.cwd(), `**`),
})
return resolved
}
export async function registerPluginModels({
rootDirectory,
container,
@@ -753,102 +719,3 @@ async function runSetupFunctions(pluginDetails: PluginDetails): Promise<void> {
})
)
}
// TODO: Create unique id for each plugin
function createPluginId(name: string): string {
return name
}
/**
* Finds the correct path for the plugin. If it is a local plugin it will be
* found in the plugins folder. Otherwise we will look for the plugin in the
* installed npm packages.
* @param {string} pluginName - the name of the plugin to find. Should match
* the name of the folder where the plugin is contained.
* @return {object} the plugin details
*/
function resolvePlugin(pluginName: string): {
resolve: string
id: string
name: string
options: Record<string, unknown>
version: string
} {
// Only find plugins when we're not given an absolute path
if (!existsSync(pluginName)) {
// Find the plugin in the local plugins folder
const resolvedPath = path.resolve(`./plugins/${pluginName}`)
if (existsSync(resolvedPath)) {
if (existsSync(`${resolvedPath}/package.json`)) {
const packageJSON = JSON.parse(
fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`)
)
const name = packageJSON.name || pluginName
// warnOnIncompatiblePeerDependency(name, packageJSON)
return {
resolve: resolvedPath,
name,
id: createPluginId(name),
options: {},
version:
packageJSON.version || createFileContentHash(resolvedPath, `**`),
}
} else {
// Make package.json a requirement for local plugins too
throw new Error(`Plugin ${pluginName} requires a package.json file`)
}
}
}
const rootDir = path.resolve(".")
/**
* Here we have an absolute path to an internal plugin, or a name of a module
* which should be located in node_modules.
*/
try {
const requireSource =
rootDir !== null
? createRequireFromPath(`${rootDir}/:internal:`)
: require
// If the path is absolute, resolve the directory of the internal plugin,
// otherwise resolve the directory containing the package.json
const resolvedPath = path.dirname(
requireSource.resolve(`${pluginName}/package.json`)
)
const packageJSON = JSON.parse(
fs.readFileSync(`${resolvedPath}/package.json`, `utf-8`)
)
// warnOnIncompatiblePeerDependency(packageJSON.name, packageJSON)
const computedResolvedPath =
resolvedPath + (process.env.DEV_MODE ? "/src" : "")
// Add support for a plugin to output the build into a dist directory
const resolvedPathToDist = resolvedPath + "/dist"
const isDistExist =
resolvedPathToDist &&
!process.env.DEV_MODE &&
existsSync(resolvedPath + "/dist")
return {
resolve: isDistExist ? resolvedPathToDist : computedResolvedPath,
id: createPluginId(packageJSON.name),
name: packageJSON.name,
options: {},
version: packageJSON.version,
}
} catch (err) {
throw new Error(
`Unable to find plugin "${pluginName}". Perhaps you need to install its package?`
)
}
}
function createFileContentHash(path, files): string {
return path + files
}

View File

@@ -141,6 +141,7 @@ export async function loadInternalModule(
if (loaderOnly) {
// The expectation is only to run the loader as standalone, so we do not need to register the service and we need to cleanup all services
const service = container.resolve(registrationName)
await service.__hooks?.onApplicationPrepareShutdown()
await service.__hooks?.onApplicationShutdown()
}
}

View File

@@ -204,6 +204,7 @@ export type MedusaAppOutput = {
notFound?: Record<string, Record<string, string>>
runMigrations: RunMigrationFn
onApplicationShutdown: () => Promise<void>
onApplicationPrepareShutdown: () => Promise<void>
}
export type MedusaAppOptions = {
@@ -243,6 +244,7 @@ async function MedusaApp_({
migrationOnly?: boolean
} = {}): Promise<MedusaAppOutput> {
const sharedContainer_ = createMedusaContainer({}, sharedContainer)
const onApplicationShutdown = async () => {
await promiseAll([
MedusaModule.onApplicationShutdown(),
@@ -250,6 +252,10 @@ async function MedusaApp_({
])
}
const onApplicationPrepareShutdown = async () => {
await promiseAll([MedusaModule.onApplicationPrepareShutdown()])
}
const modules: MedusaModuleConfig =
modulesConfig ??
(
@@ -312,6 +318,7 @@ async function MedusaApp_({
if (loaderOnly) {
return {
onApplicationShutdown,
onApplicationPrepareShutdown,
modules: allModules,
link: undefined,
query: async () => {
@@ -388,6 +395,7 @@ async function MedusaApp_({
return {
onApplicationShutdown,
onApplicationPrepareShutdown,
modules: allModules,
link: remoteLink,
query,

View File

@@ -136,6 +136,23 @@ export class MedusaModule {
)
}
public static async onApplicationPrepareShutdown(): Promise<void> {
await promiseAll(
[...MedusaModule.instances_.values()]
.map((instances) => {
return Object.values(instances).map((instance: IModuleService) => {
return instance.__hooks?.onApplicationPrepareShutdown
?.bind(instance)()
.catch(() => {
// The module should handle this and log it
return void 0
})
})
})
.flat()
)
}
public static clearInstances(): void {
MedusaModule.instances_.clear()
MedusaModule.modules_.clear()

View File

@@ -752,6 +752,9 @@ export class TransactionOrchestrator extends EventEmitter {
await setStepFailure(error)
})
const eventName = DistributedTransactionEvent.STEP_AWAITING
transaction.emit(eventName, { step, transaction })
})
)
}

View File

@@ -137,6 +137,7 @@ export enum DistributedTransactionEvent {
STEP_BEGIN = "stepBegin",
STEP_SUCCESS = "stepSuccess",
STEP_FAILURE = "stepFailure",
STEP_AWAITING = "stepAwaiting",
COMPENSATE_STEP_SUCCESS = "compensateStepSuccess",
COMPENSATE_STEP_FAILURE = "compensateStepFailure",
}
@@ -166,6 +167,11 @@ export type DistributedTransactionEvents = {
transaction: DistributedTransaction
}) => void
onStepAwaiting?: (args: {
step: TransactionStep
transaction: DistributedTransaction
}) => void
onCompensateBegin?: (args: { transaction: DistributedTransaction }) => void
onCompensateStepSuccess?: (args: {

View File

@@ -70,6 +70,10 @@ export class GlobalWorkflow extends WorkflowManager {
transaction.once("stepFailure", this.subscribe.onStepFailure)
}
if (this.subscribe.onStepAwaiting) {
transaction.once("stepAwaiting", this.subscribe.onStepAwaiting)
}
await orchestrator.resume(transaction)
return transaction

View File

@@ -244,6 +244,13 @@ export class LocalWorkflow {
)
}
if (subscribe?.onStepAwaiting) {
transaction.on(
DistributedTransactionEvent.STEP_AWAITING,
eventWrapperMap.get("onStepAwaiting")
)
}
if (subscribe?.onCompensateStepSuccess) {
transaction.on(
DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS,

View File

@@ -3,8 +3,8 @@ import {
InternalModuleDeclaration,
} from "../modules-sdk"
import { LoggerOptions } from "typeorm"
import { RedisOptions } from "ioredis"
import { LoggerOptions } from "typeorm"
/**
* @interface
@@ -744,3 +744,11 @@ export type ConfigModule = {
*/
featureFlags: Record<string, boolean | string>
}
export type PluginDetails = {
resolve: string
name: string
id: string
options: Record<string, unknown>
version: string
}

View File

@@ -297,5 +297,6 @@ export interface IModuleService {
__hooks?: {
onApplicationStart?: () => Promise<void>
onApplicationShutdown?: () => Promise<void>
onApplicationPrepareShutdown?: () => Promise<void>
}
}

View File

@@ -483,11 +483,20 @@ export class WorkflowOrchestratorService {
notify({ eventType: "onStepFailure", step, errors })
},
onStepAwaiting: ({ step, transaction }) => {
customEventHandlers?.onStepAwaiting?.({ step, transaction })
notify({ eventType: "onStepAwaiting", step })
},
onCompensateStepSuccess: ({ step, transaction }) => {
const stepName = step.definition.action!
const response = transaction.getContext().compensate[stepName]
customEventHandlers?.onStepSuccess?.({ step, transaction, response })
customEventHandlers?.onCompensateStepSuccess?.({
step,
transaction,
response,
})
notify({ eventType: "onCompensateStepSuccess", step, response })
},

View File

@@ -53,7 +53,7 @@
"@mikro-orm/migrations": "5.9.7",
"@mikro-orm/postgresql": "5.9.7",
"awilix": "^8.0.0",
"bullmq": "^5.1.3",
"bullmq": "^5.4.2",
"dotenv": "^16.4.5",
"ioredis": "^5.3.2",
"knex": "2.4.2"

View File

@@ -4,8 +4,13 @@ import {
TransactionHandlerType,
TransactionStep,
} from "@medusajs/orchestration"
import { ContainerLike, Context, MedusaContainer } from "@medusajs/types"
import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils"
import {
ContainerLike,
Context,
Logger,
MedusaContainer,
} from "@medusajs/types"
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
import {
FlowRunOptions,
MedusaWorkflow,
@@ -75,6 +80,8 @@ export class WorkflowOrchestratorService {
protected redisPublisher: Redis
protected redisSubscriber: Redis
private subscribers: Subscribers = new Map()
private activeStepsCount: number = 0
private logger: Logger
protected redisDistributedTransactionStorage_: RedisDistributedTransactionStorage
@@ -83,15 +90,18 @@ export class WorkflowOrchestratorService {
redisDistributedTransactionStorage,
redisPublisher,
redisSubscriber,
logger,
}: {
dataLoaderOnly: boolean
redisDistributedTransactionStorage: RedisDistributedTransactionStorage
workflowOrchestratorService: WorkflowOrchestratorService
redisPublisher: Redis
redisSubscriber: Redis
logger: Logger
}) {
this.redisPublisher = redisPublisher
this.redisSubscriber = redisSubscriber
this.logger = logger
redisDistributedTransactionStorage.setWorkflowOrchestratorService(this)
@@ -113,6 +123,15 @@ export class WorkflowOrchestratorService {
await this.redisDistributedTransactionStorage_.onApplicationShutdown()
}
async onApplicationPrepareShutdown() {
// eslint-disable-next-line max-len
await this.redisDistributedTransactionStorage_.onApplicationPrepareShutdown()
while (this.activeStepsCount > 0) {
await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
@InjectSharedContext()
async run<T = unknown>(
workflowIdOrWorkflow: string | ReturnWorkflow<any, any, any>,
@@ -523,6 +542,7 @@ export class WorkflowOrchestratorService {
onStepBegin: async ({ step, transaction }) => {
customEventHandlers?.onStepBegin?.({ step, transaction })
this.activeStepsCount++
await notify({ eventType: "onStepBegin", step })
},
@@ -533,8 +553,9 @@ export class WorkflowOrchestratorService {
transaction
)
customEventHandlers?.onStepSuccess?.({ step, transaction, response })
await notify({ eventType: "onStepSuccess", step, response })
this.activeStepsCount--
},
onStepFailure: async ({ step, transaction }) => {
const stepName = step.definition.action!
@@ -543,14 +564,28 @@ export class WorkflowOrchestratorService {
.filter((err) => err.action === stepName)
customEventHandlers?.onStepFailure?.({ step, transaction, errors })
await notify({ eventType: "onStepFailure", step, errors })
this.activeStepsCount--
},
onStepAwaiting: async ({ step, transaction }) => {
customEventHandlers?.onStepAwaiting?.({ step, transaction })
await notify({ eventType: "onStepAwaiting", step })
if (!step.definition.backgroundExecution) {
this.activeStepsCount--
}
},
onCompensateStepSuccess: async ({ step, transaction }) => {
const stepName = step.definition.action!
const response = transaction.getContext().compensate[stepName]
customEventHandlers?.onStepSuccess?.({ step, transaction, response })
customEventHandlers?.onCompensateStepSuccess?.({
step,
transaction,
response,
})
await notify({ eventType: "onCompensateStepSuccess", step, response })
},

View File

@@ -59,6 +59,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
await this.workflowOrchestratorService_.onApplicationShutdown()
await this.redisDisconnectHandler_()
},
onApplicationPrepareShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationPrepareShutdown()
},
}
@InjectManager("baseRepository_")

View File

@@ -64,9 +64,13 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
)
}
async onApplicationPrepareShutdown() {
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker.close()
}
async onApplicationShutdown() {
await this.queue.close()
await this.worker.close(true)
}
setWorkflowOrchestratorService(workflowOrchestratorService) {

View File

@@ -9221,7 +9221,7 @@ __metadata:
"@mikro-orm/migrations": 5.9.7
"@mikro-orm/postgresql": 5.9.7
awilix: ^8.0.0
bullmq: ^5.1.3
bullmq: ^5.4.2
cross-env: ^5.2.1
dotenv: ^16.4.5
ioredis: ^5.3.2
@@ -22504,36 +22504,18 @@ __metadata:
languageName: node
linkType: hard
"bullmq@npm:^5.1.3":
version: 5.1.3
resolution: "bullmq@npm:5.1.3"
dependencies:
cron-parser: ^4.6.0
glob: ^8.0.3
ioredis: ^5.3.2
lodash: ^4.17.21
msgpackr: ^1.10.1
node-abort-controller: ^3.1.1
semver: ^7.5.4
tslib: ^2.0.0
uuid: ^9.0.0
checksum: dc2177dfd736b2d008ccab1ba9f77f80cc730ce6197c9ffa0f37327e1cf34bd8b97d83ee9f9008253ef0c0854bbd04f8c925889a3370a0899e8f5c7a34fd3ab3
languageName: node
linkType: hard
"bullmq@npm:^5.4.2":
version: 5.4.2
resolution: "bullmq@npm:5.4.2"
version: 5.7.3
resolution: "bullmq@npm:5.7.3"
dependencies:
cron-parser: ^4.6.0
ioredis: ^5.3.2
lodash: ^4.17.21
msgpackr: ^1.10.1
node-abort-controller: ^3.1.1
semver: ^7.5.4
tslib: ^2.0.0
uuid: ^9.0.0
checksum: 01687a41bbacb646ab9cac181181c0cf18362f23b250c332125e0a701dacefb48ff20b030e095f24bd5e471f29d514bcc02779fd1f1c3cb830a3a5f1d0a39d75
checksum: eade5853736a9ad606fe6f7b0d000585d7122ca3f5e4b71965cef448b77f3b8748f90ea420b90cf8e2fd7db737197946cc57ffbdb7ea8e8e4945875cc54b74d5
languageName: node
linkType: hard