From 69410162f6b3ff99d18dc50183c4a809aefa48d4 Mon Sep 17 00:00:00 2001 From: Stevche Radevski Date: Mon, 10 Jun 2024 16:49:52 +0200 Subject: [PATCH] feat: Add support for scheduled workflows (#7651) We still need to: But wanted to open the PR for early feedback on the approach --- .../__fixtures__/mock-scheduler-storage.ts | 18 ++++ .../src/__tests__/workflow/global-workflow.ts | 4 + .../src/__tests__/workflow/local-workflow.ts | 4 + .../transaction/datastore/abstract-storage.ts | 35 ++++++ .../transaction/distributed-transaction.ts | 6 +- .../orchestration/src/transaction/types.ts | 23 ++++ .../core/orchestration/src/workflow/index.ts | 1 + .../orchestration/src/workflow/scheduler.ts | 42 ++++++++ .../src/workflow/workflow-manager.ts | 17 ++- .../src/helper/__tests__/compose.ts | 20 +++- .../__fixtures__/workflow_scheduled.ts | 23 ++++ .../integration-tests/__tests__/index.spec.ts | 101 ++++++++++++------ .../workflow-engine-inmemory/package.json | 1 + .../src/services/workflow-orchestrator.ts | 2 + .../utils/workflow-orchestrator-storage.ts | 93 +++++++++++++++- .../__fixtures__/workflow_scheduled.ts | 23 ++++ .../integration-tests/__tests__/index.spec.ts | 20 ++++ .../src/services/workflow-orchestrator.ts | 2 + .../utils/workflow-orchestrator-storage.ts | 71 +++++++++++- yarn.lock | 3 +- 20 files changed, 465 insertions(+), 44 deletions(-) create mode 100644 packages/core/orchestration/src/__fixtures__/mock-scheduler-storage.ts create mode 100644 packages/core/orchestration/src/workflow/scheduler.ts create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts diff --git a/packages/core/orchestration/src/__fixtures__/mock-scheduler-storage.ts b/packages/core/orchestration/src/__fixtures__/mock-scheduler-storage.ts new file mode 100644 index 0000000000..8cb87a89d5 --- /dev/null +++ b/packages/core/orchestration/src/__fixtures__/mock-scheduler-storage.ts @@ -0,0 +1,18 @@ +import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist" + +export class MockSchedulerStorage implements IDistributedSchedulerStorage { + async schedule( + jobDefinition: string | { jobId: string }, + schedulerOptions: SchedulerOptions + ): Promise { + return Promise.resolve() + } + + async remove(jobId: string): Promise { + return Promise.resolve() + } + + async removeAll(): Promise { + return Promise.resolve() + } +} diff --git a/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts b/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts index c4f3be8936..78f666cf22 100644 --- a/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts +++ b/packages/core/orchestration/src/__tests__/workflow/global-workflow.ts @@ -1,6 +1,10 @@ import { GlobalWorkflow } from "../../workflow/global-workflow" import { TransactionState } from "../../transaction/types" import { WorkflowManager } from "../../workflow/workflow-manager" +import { WorkflowScheduler } from "../../workflow/scheduler" +import { MockSchedulerStorage } from "../../__fixtures__/mock-scheduler-storage" + +WorkflowScheduler.setStorage(new MockSchedulerStorage()) describe("WorkflowManager", () => { const container: any = {} diff --git a/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts b/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts index 2bfbeaeecc..a3c59c41f8 100644 --- a/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/__tests__/workflow/local-workflow.ts @@ -1,7 +1,11 @@ +import { MockSchedulerStorage } from "../../__fixtures__/mock-scheduler-storage" import { TransactionState } from "../../transaction/types" import { LocalWorkflow } from "../../workflow/local-workflow" +import { WorkflowScheduler } from "../../workflow/scheduler" import { WorkflowManager } from "../../workflow/workflow-manager" +WorkflowScheduler.setStorage(new MockSchedulerStorage()) + describe("WorkflowManager", () => { const container: any = {} diff --git a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts index defc91d6cd..de8473febe 100644 --- a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts +++ b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts @@ -3,6 +3,18 @@ import { TransactionCheckpoint, } from "../distributed-transaction" import { TransactionStep } from "../transaction-step" +import { SchedulerOptions } from "../types" + +export interface IDistributedSchedulerStorage { + schedule( + jobDefinition: string | { jobId: string }, + schedulerOptions: SchedulerOptions + ): Promise + + remove(jobId: string): Promise + + removeAll(): Promise +} export interface IDistributedTransactionStorage { get(key: string): Promise @@ -36,6 +48,29 @@ export interface IDistributedTransactionStorage { ): Promise } +export abstract class DistributedSchedulerStorage + implements IDistributedSchedulerStorage +{ + constructor() { + /* noop */ + } + + async schedule( + jobDefinition: string | { jobId: string }, + schedulerOptions: SchedulerOptions + ): Promise { + throw new Error("Method 'schedule' not implemented.") + } + + async remove(jobId: string): Promise { + throw new Error("Method 'remove' not implemented.") + } + + async removeAll(): Promise { + throw new Error("Method 'removeAll' not implemented.") + } +} + export abstract class DistributedTransactionStorage implements IDistributedTransactionStorage { diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index 0a9e14e787..66b45de671 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -7,7 +7,11 @@ import { TransactionOrchestrator, } from "./transaction-orchestrator" import { TransactionStep, TransactionStepHandler } from "./transaction-step" -import { TransactionHandlerType, TransactionState } from "./types" +import { + SchedulerOptions, + TransactionHandlerType, + TransactionState, +} from "./types" /** * @typedef TransactionMetadata diff --git a/packages/core/orchestration/src/transaction/types.ts b/packages/core/orchestration/src/transaction/types.ts index f113e447df..1e744045d6 100644 --- a/packages/core/orchestration/src/transaction/types.ts +++ b/packages/core/orchestration/src/transaction/types.ts @@ -118,9 +118,32 @@ export type TransactionModelOptions = { */ storeExecution?: boolean + /** + * Defines the workflow as a scheduled workflow that executes based on the cron configuration passed. + * The value can either by a cron expression string, or an object that also allows to define the concurrency behavior. + */ + schedule?: string | SchedulerOptions + // TODO: add metadata field for customizations } +export type SchedulerOptions = { + /** + * The cron expression to schedule the workflow execution. + */ + cron: string + /** + * Setting whether to allow concurrent executions (eg. if the previous execution is still running, should the new one be allowed to run or not) + * By default concurrent executions are not allowed. + */ + concurrency?: "allow" | "forbid" + + /** + * Optionally limit the number of executions for the scheduled workflow. If not set, the workflow will run indefinitely. + */ + numberOfExecutions?: number +} + export type TransactionModel = { id: string flow: TransactionStepsDefinition diff --git a/packages/core/orchestration/src/workflow/index.ts b/packages/core/orchestration/src/workflow/index.ts index 6a8abc0698..9416514e1f 100644 --- a/packages/core/orchestration/src/workflow/index.ts +++ b/packages/core/orchestration/src/workflow/index.ts @@ -1,3 +1,4 @@ export * from "./workflow-manager" export * from "./local-workflow" export * from "./global-workflow" +export * from "./scheduler" diff --git a/packages/core/orchestration/src/workflow/scheduler.ts b/packages/core/orchestration/src/workflow/scheduler.ts new file mode 100644 index 0000000000..89eaa20172 --- /dev/null +++ b/packages/core/orchestration/src/workflow/scheduler.ts @@ -0,0 +1,42 @@ +import { MedusaError } from "@medusajs/utils" +import { IDistributedSchedulerStorage, SchedulerOptions } from "../transaction" +import { WorkflowDefinition } from "./workflow-manager" + +export class WorkflowScheduler { + private static storage: IDistributedSchedulerStorage + public static setStorage(storage: IDistributedSchedulerStorage) { + this.storage = storage + } + + public async scheduleWorkflow(workflow: WorkflowDefinition) { + const schedule = workflow.options?.schedule + if (!schedule) { + throw new MedusaError( + MedusaError.Types.INVALID_ARGUMENT, + "Workflow schedule is not defined while registering a scheduled workflow" + ) + } + + const normalizedSchedule: SchedulerOptions = + typeof schedule === "string" + ? { + cron: schedule, + concurrency: "forbid", + } + : { + cron: schedule.cron, + concurrency: schedule.concurrency || "forbid", + numberOfExecutions: schedule.numberOfExecutions, + } + + await WorkflowScheduler.storage.schedule(workflow.id, normalizedSchedule) + } + + public async clearWorkflow(workflow: WorkflowDefinition) { + await WorkflowScheduler.storage.remove(workflow.id) + } + + public async clear() { + await WorkflowScheduler.storage.removeAll() + } +} diff --git a/packages/core/orchestration/src/workflow/workflow-manager.ts b/packages/core/orchestration/src/workflow/workflow-manager.ts index 69af1b1bad..3bbb8ca295 100644 --- a/packages/core/orchestration/src/workflow/workflow-manager.ts +++ b/packages/core/orchestration/src/workflow/workflow-manager.ts @@ -10,6 +10,7 @@ import { TransactionStepHandler, TransactionStepsDefinition, } from "../transaction" +import { WorkflowScheduler } from "./scheduler" export interface WorkflowDefinition { id: string @@ -51,13 +52,20 @@ export type WorkflowStepHandler = ( export class WorkflowManager { protected static workflows: Map = new Map() + protected static scheduler = new WorkflowScheduler() static unregister(workflowId: string) { + const workflow = WorkflowManager.workflows.get(workflowId) + if (workflow?.options.schedule) { + this.scheduler.clearWorkflow(workflow) + } + WorkflowManager.workflows.delete(workflowId) } static unregisterAll() { WorkflowManager.workflows.clear() + this.scheduler.clear() } static getWorkflows() { @@ -111,7 +119,7 @@ export class WorkflowManager { } } - WorkflowManager.workflows.set(workflowId, { + const workflow = { id: workflowId, flow_: finalFlow!, orchestrator: new TransactionOrchestrator( @@ -124,7 +132,12 @@ export class WorkflowManager { options, requiredModules, optionalModules, - }) + } + + WorkflowManager.workflows.set(workflowId, workflow) + if (options.schedule) { + this.scheduler.scheduleWorkflow(workflow) + } } static update( diff --git a/packages/core/workflows-sdk/src/helper/__tests__/compose.ts b/packages/core/workflows-sdk/src/helper/__tests__/compose.ts index 4506c0c163..9b5f672b7f 100644 --- a/packages/core/workflows-sdk/src/helper/__tests__/compose.ts +++ b/packages/core/workflows-sdk/src/helper/__tests__/compose.ts @@ -1,4 +1,4 @@ -import { WorkflowManager } from "@medusajs/orchestration" +import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration" import { promiseAll } from "@medusajs/utils" import { createStep, @@ -11,6 +11,24 @@ import { } from "../.." jest.setTimeout(30000) +import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist" + +class MockSchedulerStorage implements IDistributedSchedulerStorage { + async schedule( + jobDefinition: string | { jobId: string }, + schedulerOptions: SchedulerOptions + ): Promise { + return Promise.resolve() + } + async remove(jobId: string): Promise { + return Promise.resolve() + } + async removeAll(): Promise { + return Promise.resolve() + } +} + +WorkflowScheduler.setStorage(new MockSchedulerStorage()) const afterEach_ = () => { jest.clearAllMocks() diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts new file mode 100644 index 0000000000..770527d94d --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts @@ -0,0 +1,23 @@ +import { SchedulerOptions } from "@medusajs/orchestration" +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/workflows-sdk" + +export const createScheduled = (name: string, schedule?: SchedulerOptions) => { + const workflowScheduledStepInvoke = jest.fn((input, context) => { + return new StepResponse({}) + }) + + const step = createStep("step_1", workflowScheduledStepInvoke) + + createWorkflow( + { name, schedule: schedule ?? "* * * * * *" }, + function (input) { + return step(input) + } + ) + + return workflowScheduledStepInvoke +} diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index e37fa50b48..45b2b0e899 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -3,9 +3,10 @@ import { RemoteJoinerQuery } from "@medusajs/types" import { TransactionHandlerType } from "@medusajs/utils" import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { knex } from "knex" -import { setTimeout } from "timers/promises" +import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__" +import { createScheduled } from "../__fixtures__/workflow_scheduled" import { DB_URL, TestDatabase } from "../utils" const sharedPgConnection = knex({ @@ -24,42 +25,41 @@ const afterEach_ = async () => { jest.setTimeout(50000) describe("Workflow Orchestrator module", function () { - describe("Testing basic workflow", function () { - let workflowOrcModule: IWorkflowEngineService - let query: ( - query: string | RemoteJoinerQuery | object, - variables?: Record - ) => Promise + let workflowOrcModule: IWorkflowEngineService + let query: ( + query: string | RemoteJoinerQuery | object, + variables?: Record + ) => Promise - afterEach(afterEach_) + afterEach(afterEach_) - beforeAll(async () => { - const { - runMigrations, - query: remoteQuery, - modules, - } = await MedusaApp({ - sharedResourcesConfig: { - database: { - connection: sharedPgConnection, - }, + beforeAll(async () => { + const { + runMigrations, + query: remoteQuery, + modules, + } = await MedusaApp({ + sharedResourcesConfig: { + database: { + connection: sharedPgConnection, }, - modulesConfig: { - workflows: { - resolve: __dirname + "/../..", - }, + }, + modulesConfig: { + workflows: { + resolve: __dirname + "/../..", }, - }) - - query = remoteQuery - - await runMigrations() - - workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService + }, }) - afterEach(afterEach_) + query = remoteQuery + await runMigrations() + + workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService + }) + + afterEach(afterEach_) + describe("Testing basic workflow", function () { it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => { await workflowOrcModule.run("workflow_1", { input: { @@ -168,7 +168,7 @@ describe("Workflow Orchestrator module", function () { } ) - await setTimeout(200) + await setTimeoutPromise(200) expect(transaction.flow.state).toEqual("reverted") }) @@ -201,4 +201,43 @@ describe("Workflow Orchestrator module", function () { expect(onFinish).toHaveBeenCalledTimes(0) }) }) + + describe("Scheduled workflows", () => { + beforeAll(() => { + jest.useFakeTimers() + jest.spyOn(global, "setTimeout") + }) + + afterAll(() => { + jest.useRealTimers() + }) + + it("should execute a scheduled workflow", async () => { + const spy = createScheduled("standard") + + await jest.runOnlyPendingTimersAsync() + expect(setTimeout).toHaveBeenCalledTimes(2) + expect(spy).toHaveBeenCalledTimes(1) + + await jest.runOnlyPendingTimersAsync() + expect(setTimeout).toHaveBeenCalledTimes(3) + expect(spy).toHaveBeenCalledTimes(2) + }) + + it("should stop executions after the set number of executions", async () => { + const spy = await createScheduled("num-executions", { + cron: "* * * * * *", + numberOfExecutions: 2, + }) + + await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(1) + + await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(2) + + await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(2) + }) + }) }) diff --git a/packages/modules/workflow-engine-inmemory/package.json b/packages/modules/workflow-engine-inmemory/package.json index 41743248c7..ca4acd23c3 100644 --- a/packages/modules/workflow-engine-inmemory/package.json +++ b/packages/modules/workflow-engine-inmemory/package.json @@ -53,6 +53,7 @@ "@mikro-orm/migrations": "5.9.7", "@mikro-orm/postgresql": "5.9.7", "awilix": "^8.0.0", + "cron-parser": "^4.9.0", "dotenv": "^16.4.5", "knex": "2.4.2" } diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index 1fc90daee5..d8bad5a78e 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -3,6 +3,7 @@ import { DistributedTransactionEvents, TransactionHandlerType, TransactionStep, + WorkflowScheduler, } from "@medusajs/orchestration" import { ContainerLike, Context, MedusaContainer } from "@medusajs/types" import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" @@ -83,6 +84,7 @@ export class WorkflowOrchestratorService { }) { inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this) DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage) + WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage) } @InjectSharedContext() diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index f5cc70070d..eb94cf8701 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -1,19 +1,34 @@ import { DistributedTransaction, DistributedTransactionStorage, + IDistributedSchedulerStorage, + IDistributedTransactionStorage, + SchedulerOptions, TransactionCheckpoint, TransactionStep, } from "@medusajs/orchestration" import { ModulesSdkTypes } from "@medusajs/types" import { TransactionState } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" +import { CronExpression, parseExpression } from "cron-parser" // eslint-disable-next-line max-len -export class InMemoryDistributedTransactionStorage extends DistributedTransactionStorage { +export class InMemoryDistributedTransactionStorage + implements IDistributedTransactionStorage, IDistributedSchedulerStorage +{ private workflowExecutionService_: ModulesSdkTypes.InternalModuleService private workflowOrchestratorService_: WorkflowOrchestratorService private storage: Map = new Map() + private scheduled: Map< + string, + { + timer: NodeJS.Timeout + expression: CronExpression + numberOfExecutions: number + config: SchedulerOptions + } + > = new Map() private retries: Map = new Map() private timeouts: Map = new Map() @@ -22,8 +37,6 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio }: { workflowExecutionService: ModulesSdkTypes.InternalModuleService }) { - super() - this.workflowExecutionService_ = workflowExecutionService } @@ -215,4 +228,78 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio this.timeouts.delete(key) } } + + /* Scheduler storage methods */ + async schedule( + jobDefinition: string | { jobId: string }, + schedulerOptions: SchedulerOptions + ): Promise { + const jobId = + typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId + + // In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one + // any only then we add the new one. + await this.remove(jobId) + const expression = parseExpression(schedulerOptions.cron) + const nextExecution = expression.next().getTime() - Date.now() + + const timer = setTimeout(async () => { + this.jobHandler(jobId) + }, nextExecution) + + this.scheduled.set(jobId, { + timer, + expression, + numberOfExecutions: 0, + config: schedulerOptions, + }) + } + + async remove(jobId: string): Promise { + const job = this.scheduled.get(jobId) + if (!job) { + return + } + + clearTimeout(job.timer) + this.scheduled.delete(jobId) + } + + async removeAll(): Promise { + this.scheduled.forEach((_, key) => { + this.remove(key) + }) + } + + async jobHandler(jobId: string) { + const job = this.scheduled.get(jobId) + if (!job) { + return + } + + if ( + job.config?.numberOfExecutions !== undefined && + job.config.numberOfExecutions <= job.numberOfExecutions + ) { + this.scheduled.delete(jobId) + return + } + + const nextExecution = job.expression.next().getTime() - Date.now() + const timer = setTimeout(async () => { + this.jobHandler(jobId) + }, nextExecution) + + this.scheduled.set(jobId, { + timer, + expression: job.expression, + numberOfExecutions: (job.numberOfExecutions ?? 0) + 1, + config: job.config, + }) + + // With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported. + await this.workflowOrchestratorService_.run(jobId, { + throwOnError: false, + }) + } } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts new file mode 100644 index 0000000000..770527d94d --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts @@ -0,0 +1,23 @@ +import { SchedulerOptions } from "@medusajs/orchestration" +import { + createStep, + createWorkflow, + StepResponse, +} from "@medusajs/workflows-sdk" + +export const createScheduled = (name: string, schedule?: SchedulerOptions) => { + const workflowScheduledStepInvoke = jest.fn((input, context) => { + return new StepResponse({}) + }) + + const step = createStep("step_1", workflowScheduledStepInvoke) + + createWorkflow( + { name, schedule: schedule ?? "* * * * * *" }, + function (input) { + return step(input) + } + ) + + return workflowScheduledStepInvoke +} diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 0763c652dd..6a1250b3f0 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -10,6 +10,7 @@ import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" import { DB_URL, TestDatabase } from "../utils" +import { createScheduled } from "../__fixtures__/workflow_scheduled" const sharedPgConnection = knex({ client: "pg", @@ -297,4 +298,23 @@ describe("Workflow Orchestrator module", function () { expect(onFinish).toHaveBeenCalledTimes(0) }) }) + + // Note: These tests depend on actual Redis instance and waiting for the scheduled jobs to run, which isn't great. + // Mocking bullmq, however, would make the tests close to useless, so we can keep them very minimal and serve as smoke tests. + describe("Scheduled workflows", () => { + it("should execute a scheduled workflow", async () => { + const spy = createScheduled("standard") + await setTimeout(3100) + expect(spy).toHaveBeenCalledTimes(3) + }) + + it("should stop executions after the set number of executions", async () => { + const spy = await createScheduled("num-executions", { + cron: "* * * * * *", + numberOfExecutions: 2, + }) + await setTimeout(3100) + expect(spy).toHaveBeenCalledTimes(2) + }) + }) }) diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 300d248344..14e56fe9f3 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -3,6 +3,7 @@ import { DistributedTransactionEvents, TransactionHandlerType, TransactionStep, + WorkflowScheduler, } from "@medusajs/orchestration" import { ContainerLike, @@ -110,6 +111,7 @@ export class WorkflowOrchestratorService { if (!dataLoaderOnly) { DistributedTransaction.setStorage(redisDistributedTransactionStorage) + WorkflowScheduler.setStorage(redisDistributedTransactionStorage) } this.redisDistributedTransactionStorage_ = diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index af19cd5904..c8c5ddbd00 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -1,23 +1,28 @@ import { DistributedTransaction, - DistributedTransactionStorage, + IDistributedSchedulerStorage, + IDistributedTransactionStorage, + SchedulerOptions, TransactionCheckpoint, TransactionStep, } from "@medusajs/orchestration" import { ModulesSdkTypes } from "@medusajs/types" -import { TransactionState } from "@medusajs/utils" +import { TransactionState, promiseAll } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" import { Queue, Worker } from "bullmq" import Redis from "ioredis" enum JobType { + SCHEDULE = "schedule", RETRY = "retry", STEP_TIMEOUT = "step_timeout", TRANSACTION_TIMEOUT = "transaction_timeout", } // eslint-disable-next-line max-len -export class RedisDistributedTransactionStorage extends DistributedTransactionStorage { +export class RedisDistributedTransactionStorage + implements IDistributedTransactionStorage, IDistributedSchedulerStorage +{ private static TTL_AFTER_COMPLETED = 60 * 15 // 15 minutes private workflowExecutionService_: ModulesSdkTypes.InternalModuleService private workflowOrchestratorService_: WorkflowOrchestratorService @@ -37,8 +42,6 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt redisWorkerConnection: Redis redisQueueName: string }) { - super() - this.workflowExecutionService_ = workflowExecutionService this.redisClient = redisConnection @@ -59,6 +62,14 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt job.data.transactionId ) } + + // Note: We might even want a separate worker with different concurrency settings in the future, but for now we keep it simple + if (job.name === JobType.SCHEDULE) { + await this.executeScheduledJob( + job.data.jobId, + job.data.schedulerOptions + ) + } }, { connection: redisWorkerConnection } ) @@ -108,6 +119,17 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt }) } + private async executeScheduledJob( + jobId: string, + schedulerOptions: SchedulerOptions + ) { + // TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency + // of the transaction to ensure that the transaction is only executed once. + return await this.workflowOrchestratorService_.run(jobId, { + throwOnError: false, + }) + } + async get(key: string): Promise { const data = await this.redisClient.get(key) @@ -290,4 +312,43 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt await job.remove() } } + + /* Scheduler storage methods */ + async schedule( + jobDefinition: string | { jobId: string }, + schedulerOptions: SchedulerOptions + ): Promise { + const jobId = + typeof jobDefinition === "string" ? jobDefinition : jobDefinition.jobId + + // In order to ensure that the schedule configuration is always up to date, we first cancel an existing job, if there was one + // any only then we add the new one. + await this.remove(jobId) + + await this.queue.add( + JobType.SCHEDULE, + { + jobId, + schedulerOptions, + }, + { + repeat: { + pattern: schedulerOptions.cron, + limit: schedulerOptions.numberOfExecutions, + }, + jobId: `${JobType.SCHEDULE}_${jobId}`, + } + ) + } + + async remove(jobId: string): Promise { + await this.queue.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`) + } + + async removeAll(): Promise { + const repeatableJobs = await this.queue.getRepeatableJobs() + await promiseAll( + repeatableJobs.map((job) => this.queue.removeRepeatableByKey(job.key)) + ) + } } diff --git a/yarn.lock b/yarn.lock index b0edc3d6de..808207528c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5851,6 +5851,7 @@ __metadata: "@mikro-orm/migrations": 5.9.7 "@mikro-orm/postgresql": 5.9.7 awilix: ^8.0.0 + cron-parser: ^4.9.0 cross-env: ^5.2.1 dotenv: ^16.4.5 jest: ^29.6.3 @@ -14772,7 +14773,7 @@ __metadata: languageName: node linkType: hard -"cron-parser@npm:^4.2.0, cron-parser@npm:^4.6.0": +"cron-parser@npm:^4.2.0, cron-parser@npm:^4.6.0, cron-parser@npm:^4.9.0": version: 4.9.0 resolution: "cron-parser@npm:4.9.0" dependencies: