feat: Add support for scheduled workflows (#7651)
We still need to: But wanted to open the PR for early feedback on the approach
This commit is contained in:
@@ -0,0 +1,18 @@
|
||||
import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist"
|
||||
|
||||
export class MockSchedulerStorage implements IDistributedSchedulerStorage {
|
||||
async schedule(
|
||||
jobDefinition: string | { jobId: string },
|
||||
schedulerOptions: SchedulerOptions
|
||||
): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
async remove(jobId: string): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
async removeAll(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,10 @@
|
||||
import { GlobalWorkflow } from "../../workflow/global-workflow"
|
||||
import { TransactionState } from "../../transaction/types"
|
||||
import { WorkflowManager } from "../../workflow/workflow-manager"
|
||||
import { WorkflowScheduler } from "../../workflow/scheduler"
|
||||
import { MockSchedulerStorage } from "../../__fixtures__/mock-scheduler-storage"
|
||||
|
||||
WorkflowScheduler.setStorage(new MockSchedulerStorage())
|
||||
|
||||
describe("WorkflowManager", () => {
|
||||
const container: any = {}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import { MockSchedulerStorage } from "../../__fixtures__/mock-scheduler-storage"
|
||||
import { TransactionState } from "../../transaction/types"
|
||||
import { LocalWorkflow } from "../../workflow/local-workflow"
|
||||
import { WorkflowScheduler } from "../../workflow/scheduler"
|
||||
import { WorkflowManager } from "../../workflow/workflow-manager"
|
||||
|
||||
WorkflowScheduler.setStorage(new MockSchedulerStorage())
|
||||
|
||||
describe("WorkflowManager", () => {
|
||||
const container: any = {}
|
||||
|
||||
|
||||
@@ -3,6 +3,18 @@ import {
|
||||
TransactionCheckpoint,
|
||||
} from "../distributed-transaction"
|
||||
import { TransactionStep } from "../transaction-step"
|
||||
import { SchedulerOptions } from "../types"
|
||||
|
||||
export interface IDistributedSchedulerStorage {
|
||||
schedule(
|
||||
jobDefinition: string | { jobId: string },
|
||||
schedulerOptions: SchedulerOptions
|
||||
): Promise<void>
|
||||
|
||||
remove(jobId: string): Promise<void>
|
||||
|
||||
removeAll(): Promise<void>
|
||||
}
|
||||
|
||||
export interface IDistributedTransactionStorage {
|
||||
get(key: string): Promise<TransactionCheckpoint | undefined>
|
||||
@@ -36,6 +48,29 @@ export interface IDistributedTransactionStorage {
|
||||
): Promise<void>
|
||||
}
|
||||
|
||||
export abstract class DistributedSchedulerStorage
|
||||
implements IDistributedSchedulerStorage
|
||||
{
|
||||
constructor() {
|
||||
/* noop */
|
||||
}
|
||||
|
||||
async schedule(
|
||||
jobDefinition: string | { jobId: string },
|
||||
schedulerOptions: SchedulerOptions
|
||||
): Promise<void> {
|
||||
throw new Error("Method 'schedule' not implemented.")
|
||||
}
|
||||
|
||||
async remove(jobId: string): Promise<void> {
|
||||
throw new Error("Method 'remove' not implemented.")
|
||||
}
|
||||
|
||||
async removeAll(): Promise<void> {
|
||||
throw new Error("Method 'removeAll' not implemented.")
|
||||
}
|
||||
}
|
||||
|
||||
export abstract class DistributedTransactionStorage
|
||||
implements IDistributedTransactionStorage
|
||||
{
|
||||
|
||||
@@ -7,7 +7,11 @@ import {
|
||||
TransactionOrchestrator,
|
||||
} from "./transaction-orchestrator"
|
||||
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
|
||||
import { TransactionHandlerType, TransactionState } from "./types"
|
||||
import {
|
||||
SchedulerOptions,
|
||||
TransactionHandlerType,
|
||||
TransactionState,
|
||||
} from "./types"
|
||||
|
||||
/**
|
||||
* @typedef TransactionMetadata
|
||||
|
||||
@@ -118,9 +118,32 @@ export type TransactionModelOptions = {
|
||||
*/
|
||||
storeExecution?: boolean
|
||||
|
||||
/**
|
||||
* Defines the workflow as a scheduled workflow that executes based on the cron configuration passed.
|
||||
* The value can either by a cron expression string, or an object that also allows to define the concurrency behavior.
|
||||
*/
|
||||
schedule?: string | SchedulerOptions
|
||||
|
||||
// TODO: add metadata field for customizations
|
||||
}
|
||||
|
||||
export type SchedulerOptions = {
|
||||
/**
|
||||
* The cron expression to schedule the workflow execution.
|
||||
*/
|
||||
cron: string
|
||||
/**
|
||||
* Setting whether to allow concurrent executions (eg. if the previous execution is still running, should the new one be allowed to run or not)
|
||||
* By default concurrent executions are not allowed.
|
||||
*/
|
||||
concurrency?: "allow" | "forbid"
|
||||
|
||||
/**
|
||||
* Optionally limit the number of executions for the scheduled workflow. If not set, the workflow will run indefinitely.
|
||||
*/
|
||||
numberOfExecutions?: number
|
||||
}
|
||||
|
||||
export type TransactionModel = {
|
||||
id: string
|
||||
flow: TransactionStepsDefinition
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
export * from "./workflow-manager"
|
||||
export * from "./local-workflow"
|
||||
export * from "./global-workflow"
|
||||
export * from "./scheduler"
|
||||
|
||||
42
packages/core/orchestration/src/workflow/scheduler.ts
Normal file
42
packages/core/orchestration/src/workflow/scheduler.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import { MedusaError } from "@medusajs/utils"
|
||||
import { IDistributedSchedulerStorage, SchedulerOptions } from "../transaction"
|
||||
import { WorkflowDefinition } from "./workflow-manager"
|
||||
|
||||
export class WorkflowScheduler {
|
||||
private static storage: IDistributedSchedulerStorage
|
||||
public static setStorage(storage: IDistributedSchedulerStorage) {
|
||||
this.storage = storage
|
||||
}
|
||||
|
||||
public async scheduleWorkflow(workflow: WorkflowDefinition) {
|
||||
const schedule = workflow.options?.schedule
|
||||
if (!schedule) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.INVALID_ARGUMENT,
|
||||
"Workflow schedule is not defined while registering a scheduled workflow"
|
||||
)
|
||||
}
|
||||
|
||||
const normalizedSchedule: SchedulerOptions =
|
||||
typeof schedule === "string"
|
||||
? {
|
||||
cron: schedule,
|
||||
concurrency: "forbid",
|
||||
}
|
||||
: {
|
||||
cron: schedule.cron,
|
||||
concurrency: schedule.concurrency || "forbid",
|
||||
numberOfExecutions: schedule.numberOfExecutions,
|
||||
}
|
||||
|
||||
await WorkflowScheduler.storage.schedule(workflow.id, normalizedSchedule)
|
||||
}
|
||||
|
||||
public async clearWorkflow(workflow: WorkflowDefinition) {
|
||||
await WorkflowScheduler.storage.remove(workflow.id)
|
||||
}
|
||||
|
||||
public async clear() {
|
||||
await WorkflowScheduler.storage.removeAll()
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
TransactionStepHandler,
|
||||
TransactionStepsDefinition,
|
||||
} from "../transaction"
|
||||
import { WorkflowScheduler } from "./scheduler"
|
||||
|
||||
export interface WorkflowDefinition {
|
||||
id: string
|
||||
@@ -51,13 +52,20 @@ export type WorkflowStepHandler = (
|
||||
|
||||
export class WorkflowManager {
|
||||
protected static workflows: Map<string, WorkflowDefinition> = new Map()
|
||||
protected static scheduler = new WorkflowScheduler()
|
||||
|
||||
static unregister(workflowId: string) {
|
||||
const workflow = WorkflowManager.workflows.get(workflowId)
|
||||
if (workflow?.options.schedule) {
|
||||
this.scheduler.clearWorkflow(workflow)
|
||||
}
|
||||
|
||||
WorkflowManager.workflows.delete(workflowId)
|
||||
}
|
||||
|
||||
static unregisterAll() {
|
||||
WorkflowManager.workflows.clear()
|
||||
this.scheduler.clear()
|
||||
}
|
||||
|
||||
static getWorkflows() {
|
||||
@@ -111,7 +119,7 @@ export class WorkflowManager {
|
||||
}
|
||||
}
|
||||
|
||||
WorkflowManager.workflows.set(workflowId, {
|
||||
const workflow = {
|
||||
id: workflowId,
|
||||
flow_: finalFlow!,
|
||||
orchestrator: new TransactionOrchestrator(
|
||||
@@ -124,7 +132,12 @@ export class WorkflowManager {
|
||||
options,
|
||||
requiredModules,
|
||||
optionalModules,
|
||||
})
|
||||
}
|
||||
|
||||
WorkflowManager.workflows.set(workflowId, workflow)
|
||||
if (options.schedule) {
|
||||
this.scheduler.scheduleWorkflow(workflow)
|
||||
}
|
||||
}
|
||||
|
||||
static update(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { WorkflowManager } from "@medusajs/orchestration"
|
||||
import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration"
|
||||
import { promiseAll } from "@medusajs/utils"
|
||||
import {
|
||||
createStep,
|
||||
@@ -11,6 +11,24 @@ import {
|
||||
} from "../.."
|
||||
|
||||
jest.setTimeout(30000)
|
||||
import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist"
|
||||
|
||||
class MockSchedulerStorage implements IDistributedSchedulerStorage {
|
||||
async schedule(
|
||||
jobDefinition: string | { jobId: string },
|
||||
schedulerOptions: SchedulerOptions
|
||||
): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
async remove(jobId: string): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
async removeAll(): Promise<void> {
|
||||
return Promise.resolve()
|
||||
}
|
||||
}
|
||||
|
||||
WorkflowScheduler.setStorage(new MockSchedulerStorage())
|
||||
|
||||
const afterEach_ = () => {
|
||||
jest.clearAllMocks()
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
import { SchedulerOptions } from "@medusajs/orchestration"
|
||||
import {
|
||||
createStep,
|
||||
createWorkflow,
|
||||
StepResponse,
|
||||
} from "@medusajs/workflows-sdk"
|
||||
|
||||
export const createScheduled = (name: string, schedule?: SchedulerOptions) => {
|
||||
const workflowScheduledStepInvoke = jest.fn((input, context) => {
|
||||
return new StepResponse({})
|
||||
})
|
||||
|
||||
const step = createStep("step_1", workflowScheduledStepInvoke)
|
||||
|
||||
createWorkflow(
|
||||
{ name, schedule: schedule ?? "* * * * * *" },
|
||||
function (input) {
|
||||
return step(input)
|
||||
}
|
||||
)
|
||||
|
||||
return workflowScheduledStepInvoke
|
||||
}
|
||||
@@ -3,9 +3,10 @@ import { RemoteJoinerQuery } from "@medusajs/types"
|
||||
import { TransactionHandlerType } from "@medusajs/utils"
|
||||
import { IWorkflowEngineService } from "@medusajs/workflows-sdk"
|
||||
import { knex } from "knex"
|
||||
import { setTimeout } from "timers/promises"
|
||||
import { setTimeout as setTimeoutPromise } from "timers/promises"
|
||||
import "../__fixtures__"
|
||||
import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__"
|
||||
import { createScheduled } from "../__fixtures__/workflow_scheduled"
|
||||
import { DB_URL, TestDatabase } from "../utils"
|
||||
|
||||
const sharedPgConnection = knex<any, any>({
|
||||
@@ -24,42 +25,41 @@ const afterEach_ = async () => {
|
||||
jest.setTimeout(50000)
|
||||
|
||||
describe("Workflow Orchestrator module", function () {
|
||||
describe("Testing basic workflow", function () {
|
||||
let workflowOrcModule: IWorkflowEngineService
|
||||
let query: (
|
||||
query: string | RemoteJoinerQuery | object,
|
||||
variables?: Record<string, unknown>
|
||||
) => Promise<any>
|
||||
let workflowOrcModule: IWorkflowEngineService
|
||||
let query: (
|
||||
query: string | RemoteJoinerQuery | object,
|
||||
variables?: Record<string, unknown>
|
||||
) => Promise<any>
|
||||
|
||||
afterEach(afterEach_)
|
||||
afterEach(afterEach_)
|
||||
|
||||
beforeAll(async () => {
|
||||
const {
|
||||
runMigrations,
|
||||
query: remoteQuery,
|
||||
modules,
|
||||
} = await MedusaApp({
|
||||
sharedResourcesConfig: {
|
||||
database: {
|
||||
connection: sharedPgConnection,
|
||||
},
|
||||
beforeAll(async () => {
|
||||
const {
|
||||
runMigrations,
|
||||
query: remoteQuery,
|
||||
modules,
|
||||
} = await MedusaApp({
|
||||
sharedResourcesConfig: {
|
||||
database: {
|
||||
connection: sharedPgConnection,
|
||||
},
|
||||
modulesConfig: {
|
||||
workflows: {
|
||||
resolve: __dirname + "/../..",
|
||||
},
|
||||
},
|
||||
modulesConfig: {
|
||||
workflows: {
|
||||
resolve: __dirname + "/../..",
|
||||
},
|
||||
})
|
||||
|
||||
query = remoteQuery
|
||||
|
||||
await runMigrations()
|
||||
|
||||
workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService
|
||||
},
|
||||
})
|
||||
|
||||
afterEach(afterEach_)
|
||||
query = remoteQuery
|
||||
|
||||
await runMigrations()
|
||||
|
||||
workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService
|
||||
})
|
||||
|
||||
afterEach(afterEach_)
|
||||
describe("Testing basic workflow", function () {
|
||||
it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => {
|
||||
await workflowOrcModule.run("workflow_1", {
|
||||
input: {
|
||||
@@ -168,7 +168,7 @@ describe("Workflow Orchestrator module", function () {
|
||||
}
|
||||
)
|
||||
|
||||
await setTimeout(200)
|
||||
await setTimeoutPromise(200)
|
||||
|
||||
expect(transaction.flow.state).toEqual("reverted")
|
||||
})
|
||||
@@ -201,4 +201,43 @@ describe("Workflow Orchestrator module", function () {
|
||||
expect(onFinish).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe("Scheduled workflows", () => {
|
||||
beforeAll(() => {
|
||||
jest.useFakeTimers()
|
||||
jest.spyOn(global, "setTimeout")
|
||||
})
|
||||
|
||||
afterAll(() => {
|
||||
jest.useRealTimers()
|
||||
})
|
||||
|
||||
it("should execute a scheduled workflow", async () => {
|
||||
const spy = createScheduled("standard")
|
||||
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(setTimeout).toHaveBeenCalledTimes(2)
|
||||
expect(spy).toHaveBeenCalledTimes(1)
|
||||
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(setTimeout).toHaveBeenCalledTimes(3)
|
||||
expect(spy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it("should stop executions after the set number of executions", async () => {
|
||||
const spy = await createScheduled("num-executions", {
|
||||
cron: "* * * * * *",
|
||||
numberOfExecutions: 2,
|
||||
})
|
||||
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(spy).toHaveBeenCalledTimes(1)
|
||||
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(spy).toHaveBeenCalledTimes(2)
|
||||
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(spy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -53,6 +53,7 @@
|
||||
"@mikro-orm/migrations": "5.9.7",
|
||||
"@mikro-orm/postgresql": "5.9.7",
|
||||
"awilix": "^8.0.0",
|
||||
"cron-parser": "^4.9.0",
|
||||
"dotenv": "^16.4.5",
|
||||
"knex": "2.4.2"
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
DistributedTransactionEvents,
|
||||
TransactionHandlerType,
|
||||
TransactionStep,
|
||||
WorkflowScheduler,
|
||||
} from "@medusajs/orchestration"
|
||||
import { ContainerLike, Context, MedusaContainer } from "@medusajs/types"
|
||||
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
|
||||
@@ -83,6 +84,7 @@ export class WorkflowOrchestratorService {
|
||||
}) {
|
||||
inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this)
|
||||
DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage)
|
||||
WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage)
|
||||
}
|
||||
|
||||
@InjectSharedContext()
|
||||
|
||||
@@ -1,19 +1,34 @@
|
||||
import {
|
||||
DistributedTransaction,
|
||||
DistributedTransactionStorage,
|
||||
IDistributedSchedulerStorage,
|
||||
IDistributedTransactionStorage,
|
||||
SchedulerOptions,
|
||||
TransactionCheckpoint,
|
||||
TransactionStep,
|
||||
} from "@medusajs/orchestration"
|
||||
import { ModulesSdkTypes } from "@medusajs/types"
|
||||
import { TransactionState } from "@medusajs/utils"
|
||||
import { WorkflowOrchestratorService } from "@services"
|
||||
import { CronExpression, parseExpression } from "cron-parser"
|
||||
|
||||
// eslint-disable-next-line max-len
|
||||
export class InMemoryDistributedTransactionStorage extends DistributedTransactionStorage {
|
||||
export class InMemoryDistributedTransactionStorage
|
||||
implements IDistributedTransactionStorage, IDistributedSchedulerStorage
|
||||
{
|
||||
private workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
|
||||
private workflowOrchestratorService_: WorkflowOrchestratorService
|
||||
|
||||
private storage: Map<string, TransactionCheckpoint> = new Map()
|
||||
private scheduled: Map<
|
||||
string,
|
||||
{
|
||||
timer: NodeJS.Timeout
|
||||
expression: CronExpression
|
||||
numberOfExecutions: number
|
||||
config: SchedulerOptions
|
||||
}
|
||||
> = new Map()
|
||||
private retries: Map<string, unknown> = new Map()
|
||||
private timeouts: Map<string, unknown> = new Map()
|
||||
|
||||
@@ -22,8 +37,6 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio
|
||||
}: {
|
||||
workflowExecutionService: ModulesSdkTypes.InternalModuleService<any>
|
||||
}) {
|
||||
super()
|
||||
|
||||
this.workflowExecutionService_ = workflowExecutionService
|
||||
}
|
||||
|
||||
@@ -215,4 +228,78 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio
|
||||
this.timeouts.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
/* Scheduler storage methods */
|
||||
async schedule(
|
||||
jobDefinition: string | { jobId: string },
|
||||
schedulerOptions: SchedulerOptions
|
||||
): Promise<void> {
|
||||
const jobId =
|
||||
typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId
|
||||
|
||||
// In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one
|
||||
// any only then we add the new one.
|
||||
await this.remove(jobId)
|
||||
const expression = parseExpression(schedulerOptions.cron)
|
||||
const nextExecution = expression.next().getTime() - Date.now()
|
||||
|
||||
const timer = setTimeout(async () => {
|
||||
this.jobHandler(jobId)
|
||||
}, nextExecution)
|
||||
|
||||
this.scheduled.set(jobId, {
|
||||
timer,
|
||||
expression,
|
||||
numberOfExecutions: 0,
|
||||
config: schedulerOptions,
|
||||
})
|
||||
}
|
||||
|
||||
async remove(jobId: string): Promise<void> {
|
||||
const job = this.scheduled.get(jobId)
|
||||
if (!job) {
|
||||
return
|
||||
}
|
||||
|
||||
clearTimeout(job.timer)
|
||||
this.scheduled.delete(jobId)
|
||||
}
|
||||
|
||||
async removeAll(): Promise<void> {
|
||||
this.scheduled.forEach((_, key) => {
|
||||
this.remove(key)
|
||||
})
|
||||
}
|
||||
|
||||
async jobHandler(jobId: string) {
|
||||
const job = this.scheduled.get(jobId)
|
||||
if (!job) {
|
||||
return
|
||||
}
|
||||
|
||||
if (
|
||||
job.config?.numberOfExecutions !== undefined &&
|
||||
job.config.numberOfExecutions <= job.numberOfExecutions
|
||||
) {
|
||||
this.scheduled.delete(jobId)
|
||||
return
|
||||
}
|
||||
|
||||
const nextExecution = job.expression.next().getTime() - Date.now()
|
||||
const timer = setTimeout(async () => {
|
||||
this.jobHandler(jobId)
|
||||
}, nextExecution)
|
||||
|
||||
this.scheduled.set(jobId, {
|
||||
timer,
|
||||
expression: job.expression,
|
||||
numberOfExecutions: (job.numberOfExecutions ?? 0) + 1,
|
||||
config: job.config,
|
||||
})
|
||||
|
||||
// With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported.
|
||||
await this.workflowOrchestratorService_.run(jobId, {
|
||||
throwOnError: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
import { SchedulerOptions } from "@medusajs/orchestration"
|
||||
import {
|
||||
createStep,
|
||||
createWorkflow,
|
||||
StepResponse,
|
||||
} from "@medusajs/workflows-sdk"
|
||||
|
||||
export const createScheduled = (name: string, schedule?: SchedulerOptions) => {
|
||||
const workflowScheduledStepInvoke = jest.fn((input, context) => {
|
||||
return new StepResponse({})
|
||||
})
|
||||
|
||||
const step = createStep("step_1", workflowScheduledStepInvoke)
|
||||
|
||||
createWorkflow(
|
||||
{ name, schedule: schedule ?? "* * * * * *" },
|
||||
function (input) {
|
||||
return step(input)
|
||||
}
|
||||
)
|
||||
|
||||
return workflowScheduledStepInvoke
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import { knex } from "knex"
|
||||
import { setTimeout } from "timers/promises"
|
||||
import "../__fixtures__"
|
||||
import { DB_URL, TestDatabase } from "../utils"
|
||||
import { createScheduled } from "../__fixtures__/workflow_scheduled"
|
||||
|
||||
const sharedPgConnection = knex<any, any>({
|
||||
client: "pg",
|
||||
@@ -297,4 +298,23 @@ describe("Workflow Orchestrator module", function () {
|
||||
expect(onFinish).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
})
|
||||
|
||||
// Note: These tests depend on actual Redis instance and waiting for the scheduled jobs to run, which isn't great.
|
||||
// Mocking bullmq, however, would make the tests close to useless, so we can keep them very minimal and serve as smoke tests.
|
||||
describe("Scheduled workflows", () => {
|
||||
it("should execute a scheduled workflow", async () => {
|
||||
const spy = createScheduled("standard")
|
||||
await setTimeout(3100)
|
||||
expect(spy).toHaveBeenCalledTimes(3)
|
||||
})
|
||||
|
||||
it("should stop executions after the set number of executions", async () => {
|
||||
const spy = await createScheduled("num-executions", {
|
||||
cron: "* * * * * *",
|
||||
numberOfExecutions: 2,
|
||||
})
|
||||
await setTimeout(3100)
|
||||
expect(spy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
DistributedTransactionEvents,
|
||||
TransactionHandlerType,
|
||||
TransactionStep,
|
||||
WorkflowScheduler,
|
||||
} from "@medusajs/orchestration"
|
||||
import {
|
||||
ContainerLike,
|
||||
@@ -110,6 +111,7 @@ export class WorkflowOrchestratorService {
|
||||
|
||||
if (!dataLoaderOnly) {
|
||||
DistributedTransaction.setStorage(redisDistributedTransactionStorage)
|
||||
WorkflowScheduler.setStorage(redisDistributedTransactionStorage)
|
||||
}
|
||||
|
||||
this.redisDistributedTransactionStorage_ =
|
||||
|
||||
@@ -1,23 +1,28 @@
|
||||
import {
|
||||
DistributedTransaction,
|
||||
DistributedTransactionStorage,
|
||||
IDistributedSchedulerStorage,
|
||||
IDistributedTransactionStorage,
|
||||
SchedulerOptions,
|
||||
TransactionCheckpoint,
|
||||
TransactionStep,
|
||||
} from "@medusajs/orchestration"
|
||||
import { ModulesSdkTypes } from "@medusajs/types"
|
||||
import { TransactionState } from "@medusajs/utils"
|
||||
import { TransactionState, promiseAll } from "@medusajs/utils"
|
||||
import { WorkflowOrchestratorService } from "@services"
|
||||
import { Queue, Worker } from "bullmq"
|
||||
import Redis from "ioredis"
|
||||
|
||||
enum JobType {
|
||||
SCHEDULE = "schedule",
|
||||
RETRY = "retry",
|
||||
STEP_TIMEOUT = "step_timeout",
|
||||
TRANSACTION_TIMEOUT = "transaction_timeout",
|
||||
}
|
||||
|
||||
// eslint-disable-next-line max-len
|
||||
export class RedisDistributedTransactionStorage extends DistributedTransactionStorage {
|
||||
export class RedisDistributedTransactionStorage
|
||||
implements IDistributedTransactionStorage, IDistributedSchedulerStorage
|
||||
{
|
||||
private static TTL_AFTER_COMPLETED = 60 * 15 // 15 minutes
|
||||
private workflowExecutionService_: ModulesSdkTypes.InternalModuleService<any>
|
||||
private workflowOrchestratorService_: WorkflowOrchestratorService
|
||||
@@ -37,8 +42,6 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
|
||||
redisWorkerConnection: Redis
|
||||
redisQueueName: string
|
||||
}) {
|
||||
super()
|
||||
|
||||
this.workflowExecutionService_ = workflowExecutionService
|
||||
|
||||
this.redisClient = redisConnection
|
||||
@@ -59,6 +62,14 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
|
||||
job.data.transactionId
|
||||
)
|
||||
}
|
||||
|
||||
// Note: We might even want a separate worker with different concurrency settings in the future, but for now we keep it simple
|
||||
if (job.name === JobType.SCHEDULE) {
|
||||
await this.executeScheduledJob(
|
||||
job.data.jobId,
|
||||
job.data.schedulerOptions
|
||||
)
|
||||
}
|
||||
},
|
||||
{ connection: redisWorkerConnection }
|
||||
)
|
||||
@@ -108,6 +119,17 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
|
||||
})
|
||||
}
|
||||
|
||||
private async executeScheduledJob(
|
||||
jobId: string,
|
||||
schedulerOptions: SchedulerOptions
|
||||
) {
|
||||
// TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency
|
||||
// of the transaction to ensure that the transaction is only executed once.
|
||||
return await this.workflowOrchestratorService_.run(jobId, {
|
||||
throwOnError: false,
|
||||
})
|
||||
}
|
||||
|
||||
async get(key: string): Promise<TransactionCheckpoint | undefined> {
|
||||
const data = await this.redisClient.get(key)
|
||||
|
||||
@@ -290,4 +312,43 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt
|
||||
await job.remove()
|
||||
}
|
||||
}
|
||||
|
||||
/* Scheduler storage methods */
|
||||
async schedule(
|
||||
jobDefinition: string | { jobId: string },
|
||||
schedulerOptions: SchedulerOptions
|
||||
): Promise<void> {
|
||||
const jobId =
|
||||
typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId
|
||||
|
||||
// In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one
|
||||
// any only then we add the new one.
|
||||
await this.remove(jobId)
|
||||
|
||||
await this.queue.add(
|
||||
JobType.SCHEDULE,
|
||||
{
|
||||
jobId,
|
||||
schedulerOptions,
|
||||
},
|
||||
{
|
||||
repeat: {
|
||||
pattern: schedulerOptions.cron,
|
||||
limit: schedulerOptions.numberOfExecutions,
|
||||
},
|
||||
jobId: `${JobType.SCHEDULE}_${jobId}`,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async remove(jobId: string): Promise<void> {
|
||||
await this.queue.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`)
|
||||
}
|
||||
|
||||
async removeAll(): Promise<void> {
|
||||
const repeatableJobs = await this.queue.getRepeatableJobs()
|
||||
await promiseAll(
|
||||
repeatableJobs.map((job) => this.queue.removeRepeatableByKey(job.key))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5851,6 +5851,7 @@ __metadata:
|
||||
"@mikro-orm/migrations": 5.9.7
|
||||
"@mikro-orm/postgresql": 5.9.7
|
||||
awilix: ^8.0.0
|
||||
cron-parser: ^4.9.0
|
||||
cross-env: ^5.2.1
|
||||
dotenv: ^16.4.5
|
||||
jest: ^29.6.3
|
||||
@@ -14772,7 +14773,7 @@ __metadata:
|
||||
languageName: node
|
||||
linkType: hard
|
||||
|
||||
"cron-parser@npm:^4.2.0, cron-parser@npm:^4.6.0":
|
||||
"cron-parser@npm:^4.2.0, cron-parser@npm:^4.6.0, cron-parser@npm:^4.9.0":
|
||||
version: 4.9.0
|
||||
resolution: "cron-parser@npm:4.9.0"
|
||||
dependencies:
|
||||
|
||||
Reference in New Issue
Block a user