diff --git a/.changeset/slimy-jobs-warn.md b/.changeset/slimy-jobs-warn.md new file mode 100644 index 0000000000..a16b873355 --- /dev/null +++ b/.changeset/slimy-jobs-warn.md @@ -0,0 +1,6 @@ +--- +"medusa-plugin-brightpearl": patch +"@medusajs/medusa": patch +--- + +feat(medusa): Extract cron jobs logic from the EventBusService to its own service JobSchedulerService diff --git a/docs/content/advanced/backend/cron-jobs/create.md b/docs/content/advanced/backend/cron-jobs/create.md deleted file mode 100644 index 79c607d6dd..0000000000 --- a/docs/content/advanced/backend/cron-jobs/create.md +++ /dev/null @@ -1,128 +0,0 @@ -# How to Create a Cron Job - -In this document, you’ll learn how to create a cron job in Medusa. - -## Overview - -Medusa allows you to create cron jobs that run at specific times during your server’s lifetime. For example, you can synchronize your inventory with an Enterprise Resource Planning (ERP) system once a day. - -This guide explains how to create a cron job on your Medusa server. The cron job in this example will simply change the status of draft products to `published`. - -## Prerequisites - -### Medusa Components - -It is assumed that you already have a Medusa server installed and set up. If not, you can follow the [quickstart guide](../../../quickstart/quick-start.md) to get started. - -### Redis - -Redis is required for cron jobs to work. Make sure you [install Redis](../../../tutorial/0-set-up-your-development-environment.mdx#redis) and [configure it with your Medusa server](../../../usage/configurations.md#redis). - -## 1. Create a File - -Each cron job should reside in a TypeScript or JavaScript file under the `src/loaders` directory. - -Start by creating the `src/loaders` directory. Then, inside that directory, create the JavaScript or TypeScript file that you’ll add the cron job in. You can use any name for the file. - -For the example in this tutorial, you can create the file `src/loaders/publish.ts`. - -## 2. Create Cron Job - -To create a cron job, add the following code in the file you created, which is `src/loaders/publish.ts` in this example: - -```ts title=src/loaders/publish.ts -const publishJob = async (container, options) => { - const eventBus = container.resolve("eventBusService"); - eventBus.createCronJob("publish-products", {}, "0 0 * * *", async () => { - //job to execute - const productService = container.resolve("productService"); - const draftProducts = await productService.list({ - status: 'draft' - }); - - for (const product of draftProducts) { - await productService.update(product.id, { - status: 'published' - }); - } - }) -} - -export default publishJob; -``` - -This file should export a function that accepts a `container` and `options` parameters. `container` is the dependency container that you can use to resolve services, such as the [EventBusService](../../../references/services/classes/EventBusService.md). `options` are the plugin’s options if this cron job is created in a plugin. - -You then resolve the `EventBusService` and use the `eventBus.createCronJob` method to create the cron job. This method accepts four parameters: - -- The first parameter is a unique name to give to the cron job. In the example above, you use the name `publish-products`; -- The second parameter is an object which can be used to [pass data to the job](#pass-data-to-the-cron-job); -- The third parameter is the cron job expression pattern. In this example, it will execute the cron job once a day at 12 AM. -- The fourth parameter is the function to execute. This is where you add the code to execute once the cron job runs. In this example, you retrieve the draft products using the [ProductService](../../../references/services/classes/ProductService.md) and update the status of each of these products to `published`. - -:::tip - -You can see examples of cron job expression patterns on [crontab guru](https://crontab.guru/examples.html). - -::: - -### Pass Data to the Cron Job - -To pass data to your cron job, you can add them to the object passed as a second parameter under the `data` property. This is helpful if you use one function to handle multiple cron jobs. - -For example: - -```ts -eventBus.createCronJob("publish-products", { - data: { - productId - } - }, "0 0 * * *", async (job) => { - console.log(job.data); // {productId: 'prod_124...'} - //... -}); -``` - -## 3. Run Medusa Server - -:::info - -Cron Jobs only run while the Medusa server is running. - -::: - -In your terminal run the following command to run your Medusa server: - -```bash npm2yarn -npm run start -``` - -This builds your code under the `src` directory into the `dist` directory, then runs the Medusa server. - -If the cron job was registered successfully, you should see a message similar to this logged on your Medusa server: - -```bash -Registering publish-products -``` - -Where `publish-products` is the unique name you provided to the cron job. - -Once it is time to run your cron job based on the cron job expression pattern, the cron job will run and you can see it logged on your Medusa server. - -For example, the above cron job will run at 12 AM and, when it runs, you can see the following logged on your Medusa server: - -```bash noReport -info: Processing cron job: publish-products -``` - -If you log anything in the cron job, for example using `console.log`, or if any errors are thrown, it’ll also be logged on your Medusa server. - -:::tip - -To test the previous example out instantly, you can change the cron job expression pattern passed as the third parameter to `eventBus.createCronJob` to `* * * * *`. This will run the cron job every minute. - -::: - -## What’s Next - -- Learn more about [services and how you can use them](../services/overview.md). diff --git a/docs/content/advanced/backend/scheduled-jobs/create.md b/docs/content/advanced/backend/scheduled-jobs/create.md new file mode 100644 index 0000000000..246f2be93d --- /dev/null +++ b/docs/content/advanced/backend/scheduled-jobs/create.md @@ -0,0 +1,134 @@ +# How to Create a Scheduled Job + +In this document, you’ll learn how to create a scheduled job in Medusa. + +## Overview + +Medusa allows you to create scheduled jobs that run at specific times during your server’s lifetime. For example, you can synchronize your inventory with an Enterprise Resource Planning (ERP) system once a day. + +This guide explains how to create a scheduled job on your Medusa server. The scheduled job in this example will simply change the status of draft products to `published`. + +## Prerequisites + +### Medusa Components + +It is assumed that you already have a Medusa server installed and set up. If not, you can follow the [quickstart guide](../../../quickstart/quick-start.md) to get started. + +### Redis + +Redis is required for scheduled jobs to work. Make sure you [install Redis](../../../tutorial/0-set-up-your-development-environment.mdx#redis) and [configure it with your Medusa server](../../../usage/configurations.md#redis). + +## 1. Create a File + +Each scheduled job should reside in a TypeScript or JavaScript file under the `src/loaders` directory. + +Start by creating the `src/loaders` directory. Then, inside that directory, create the JavaScript or TypeScript file that you’ll add the scheduled job in. You can use any name for the file. + +For the example in this tutorial, you can create the file `src/loaders/publish.ts`. + +## 2. Create Cron Job + +To create a scheduled job, add the following code in the file you created, which is `src/loaders/publish.ts` in this example: + +```ts title=src/loaders/publish.ts +const publishJob = async (container, options) => { + const jobSchedulerService = container.resolve("jobSchedulerService"); + jobSchedulerService.create("publish-products", {}, "0 0 * * *", async () => { + //job to execute + const productService = container.resolve("productService"); + const draftProducts = await productService.list({ + status: 'draft' + }); + + for (const product of draftProducts) { + await productService.update(product.id, { + status: 'published' + }); + } + }) +} + +export default publishJob; +``` + +:::info + +The service taking care of background jobs was renamed in v1.7.1. If you are running a previous version, use `eventBusService` instead of `jobSchedulerService`. + +::: + +This file should export a function that accepts a `container` and `options` parameters. `container` is the dependency container that you can use to resolve services, such as the JobSchedulerService. `options` are the plugin’s options if this scheduled job is created in a plugin. + +You then resolve the `JobSchedulerService` and use the `jobSchedulerService.create` method to create the scheduled job. This method accepts four parameters: + +- The first parameter is a unique name to give to the scheduled job. In the example above, you use the name `publish-products`; +- The second parameter is an object which can be used to [pass data to the job](#pass-data-to-the-scheduled-job); +- The third parameter is the scheduled job expression pattern. In this example, it will execute the scheduled job once a day at 12 AM. +- The fourth parameter is the function to execute. This is where you add the code to execute once the scheduled job runs. In this example, you retrieve the draft products using the [ProductService](../../../references/services/classes/ProductService.md) and update the status of each of these products to `published`. + +:::tip + +You can see examples of scheduled job expression patterns on [crontab guru](https://crontab.guru/examples.html). + +::: + +### Pass Data to the Cron Job + +To pass data to your scheduled job, you can add them to the object passed as a second parameter under the `data` property. This is helpful if you use one function to handle multiple scheduled jobs. + +For example: + +```ts +jobSchedulerService.create("publish-products", { + data: { + productId + } + }, "0 0 * * *", async (job) => { + console.log(job.data); // {productId: 'prod_124...'} + //... +}); +``` + +## 3. Run Medusa Server + +:::info + +Cron Jobs only run while the Medusa server is running. + +::: + +In your terminal run the following command to run your Medusa server: + +```bash npm2yarn +npm run start +``` + +This builds your code under the `src` directory into the `dist` directory, then runs the Medusa server. + +If the scheduled job was registered successfully, you should see a message similar to this logged on your Medusa server: + +```bash +Registering publish-products +``` + +Where `publish-products` is the unique name you provided to the scheduled job. + +Once it is time to run your scheduled job based on the scheduled job expression pattern, the scheduled job will run and you can see it logged on your Medusa server. + +For example, the above scheduled job will run at 12 AM and, when it runs, you can see the following logged on your Medusa server: + +```bash noReport +info: Processing scheduled job: publish-products +``` + +If you log anything in the scheduled job, for example using `console.log`, or if any errors are thrown, it’ll also be logged on your Medusa server. + +:::tip + +To test the previous example out instantly, you can change the scheduled job expression pattern passed as the third parameter to `jobSchedulerService.create` to `* * * * *`. This will run the scheduled job every minute. + +::: + +## What’s Next + +- Learn more about [services and how you can use them](../services/overview.md). diff --git a/packages/medusa-plugin-brightpearl/src/loaders/inventory.js b/packages/medusa-plugin-brightpearl/src/loaders/inventory.js index fc76a4fdf0..b253dd04f4 100644 --- a/packages/medusa-plugin-brightpearl/src/loaders/inventory.js +++ b/packages/medusa-plugin-brightpearl/src/loaders/inventory.js @@ -3,10 +3,10 @@ const inventorySync = async (container, options) => { return } else { const brightpearlService = container.resolve("brightpearlService") - const eventBus = container.resolve("eventBusService") + const jobSchedulerService = container.resolve("jobSchedulerService") try { const pattern = options.inventory_sync_cron - eventBus.createCronJob("inventory-sync", {}, pattern, () => + jobSchedulerService.create("inventory-sync", {}, pattern, () => brightpearlService.syncInventory() ) } catch (err) { diff --git a/packages/medusa/src/services/__tests__/event-bus.js b/packages/medusa/src/services/__tests__/event-bus.js index 83e59c77ce..fa4392c0c9 100644 --- a/packages/medusa/src/services/__tests__/event-bus.js +++ b/packages/medusa/src/services/__tests__/event-bus.js @@ -1,7 +1,7 @@ import Bull from "bull" -import { MockRepository, MockManager } from "medusa-test-utils" -import EventBusService from "../event-bus" +import { MockManager, MockRepository } from "medusa-test-utils" import config from "../../loaders/config" +import EventBusService from "../event-bus" jest.genMockFromModule("bull") jest.mock("bull") @@ -36,7 +36,7 @@ describe("EventBusService", () => { }) it("creates bull queue", () => { - expect(Bull).toHaveBeenCalledTimes(2) + expect(Bull).toHaveBeenCalledTimes(1) expect(Bull).toHaveBeenCalledWith("EventBusService:queue", { createClient: expect.any(Function), }) @@ -97,7 +97,8 @@ describe("EventBusService", () => { }) describe("emit", () => { - let eventBus, job + let eventBus + let job describe("successfully adds job to queue", () => { beforeAll(() => { jest.resetAllMocks() @@ -126,7 +127,8 @@ describe("EventBusService", () => { }) describe("worker", () => { - let eventBus, result + let eventBus + let result describe("successfully runs the worker", () => { beforeAll(async () => { jest.resetAllMocks() @@ -134,11 +136,14 @@ describe("EventBusService", () => { find: () => Promise.resolve([]), }) - eventBus = new EventBusService({ - manager: MockManager, - stagedJobRepository, - logger: loggerMock, - }, {}) + eventBus = new EventBusService( + { + manager: MockManager, + stagedJobRepository, + logger: loggerMock, + }, + {} + ) eventBus.subscribe("eventName", () => Promise.resolve("hi")) result = await eventBus.worker_({ data: { eventName: "eventName", data: {} }, diff --git a/packages/medusa/src/services/__tests__/job-scheduler.js b/packages/medusa/src/services/__tests__/job-scheduler.js new file mode 100644 index 0000000000..75e2a934da --- /dev/null +++ b/packages/medusa/src/services/__tests__/job-scheduler.js @@ -0,0 +1,95 @@ +import Bull from "bull" +import config from "../../loaders/config" +import JobSchedulerService from "../job-scheduler" + +jest.genMockFromModule("bull") +jest.mock("bull") +jest.mock("../../loaders/config") + +config.redisURI = "testhost" + +const loggerMock = { + info: jest.fn().mockReturnValue(console.log), + warn: jest.fn().mockReturnValue(console.log), + error: jest.fn().mockReturnValue(console.log), +} + +describe("JobSchedulerService", () => { + describe("constructor", () => { + let jobScheduler + beforeAll(() => { + jest.resetAllMocks() + + jobScheduler = new JobSchedulerService({ + logger: loggerMock, + }) + }) + + it("creates bull queue", () => { + expect(Bull).toHaveBeenCalledTimes(1) + expect(Bull).toHaveBeenCalledWith("scheduled-jobs:queue", { + createClient: expect.any(Function), + }) + }) + }) + + describe("create", () => { + let jobScheduler + describe("successfully creates scheduled job and add handler", () => { + beforeAll(() => { + jest.resetAllMocks() + + jobScheduler = new JobSchedulerService({ + logger: loggerMock, + }) + + jobScheduler.create( + "eventName", + { data: "test" }, + "* * * * *", + () => "test" + ) + }) + + it("added the handler to the job queue", () => { + expect(jobScheduler.handlers_.get("eventName").length).toEqual(1) + }) + }) + + describe("scheduledJobWorker", () => { + let jobScheduler + let result + describe("successfully runs the worker", () => { + beforeAll(async () => { + jest.resetAllMocks() + + jobScheduler = new JobSchedulerService( + { + logger: loggerMock, + }, + {} + ) + + jobScheduler.create("eventName", { data: "test" }, "* * * * *", () => + Promise.resolve("hi") + ) + + result = await jobScheduler.scheduledJobsWorker({ + data: { eventName: "eventName", data: {} }, + }) + }) + + it("calls logger", () => { + expect(loggerMock.info).toHaveBeenCalled() + expect(loggerMock.info).toHaveBeenCalledWith( + "Processing scheduled job: eventName" + ) + }) + + it("returns array with hi", async () => { + expect(result).toEqual(["hi"]) + }) + }) + }) + }) +}) diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index d18cb16d92..e9ae4d896d 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -1,15 +1,17 @@ import Bull from "bull" import Redis from "ioredis" import { EntityManager } from "typeorm" -import { ConfigModule, Logger } from "../types/global" -import { StagedJobRepository } from "../repositories/staged-job" import { StagedJob } from "../models" +import { StagedJobRepository } from "../repositories/staged-job" +import { ConfigModule, Logger } from "../types/global" import { sleep } from "../utils/sleep" +import JobSchedulerService from "./job-scheduler" type InjectedDependencies = { manager: EntityManager logger: Logger stagedJobRepository: typeof StagedJobRepository + jobSchedulerService: JobSchedulerService redisClient: Redis.Redis redisSubscriber: Redis.Redis } @@ -34,11 +36,10 @@ export default class EventBusService { protected readonly manager_: EntityManager protected readonly logger_: Logger protected readonly stagedJobRepository_: typeof StagedJobRepository + protected readonly jobSchedulerService_: JobSchedulerService protected readonly observers_: Map - protected readonly cronHandlers_: Map protected readonly redisClient_: Redis.Redis protected readonly redisSubscriber_: Redis.Redis - protected readonly cronQueue_: Bull protected queue_: Bull protected shouldEnqueuerRun: boolean protected transactionManager_: EntityManager | undefined @@ -79,14 +80,10 @@ export default class EventBusService { this.observers_ = new Map() this.queue_ = new Bull(`${this.constructor.name}:queue`, opts) - this.cronHandlers_ = new Map() this.redisClient_ = redisClient this.redisSubscriber_ = redisSubscriber - this.cronQueue_ = new Bull(`cron-jobs:queue`, opts) // Register our worker to handle emit calls this.queue_.process(this.worker_) - // Register cron worker - this.cronQueue_.process(this.cronWorker_) if (process.env.NODE_ENV !== "test") { this.startEnqueuer() @@ -103,6 +100,7 @@ export default class EventBusService { { manager: transactionManager, stagedJobRepository: this.stagedJobRepository_, + jobSchedulerService: this.jobSchedulerService_, logger: this.logger_, redisClient: this.redisClient_, redisSubscriber: this.redisSubscriber_, @@ -157,27 +155,6 @@ export default class EventBusService { return this } - /** - * Adds a function to a list of event subscribers. - * @param event - the event that the subscriber will listen for. - * @param subscriber - the function to be called when a certain event - * happens. Subscribers must return a Promise. - * @return this - */ - protected registerCronHandler_( - event: string | symbol, - subscriber: Subscriber - ): this { - if (typeof subscriber !== "function") { - throw new Error("Handler must be a function") - } - - const cronHandlers = this.cronHandlers_.get(event) ?? [] - this.cronHandlers_.set(event, [...cronHandlers, subscriber]) - - return this - } - /** * Calls all subscribers when an event occurs. * @param {string} eventName - the name of the event to be process. @@ -198,7 +175,7 @@ export default class EventBusService { const stagedJobInstance = stagedJobRepository.create({ event_name: eventName, data, - }) + } as StagedJob) return await stagedJobRepository.save(stagedJobInstance) } else { const opts: { removeOnComplete: boolean } & EmitOptions = { @@ -288,32 +265,9 @@ export default class EventBusService { ) } - /** - * Handles incoming jobs. - * @param job The job object - * @return resolves to the results of the subscriber calls. - */ - cronWorker_ = async (job: { - data: { eventName: string; data: T } - }): Promise => { - const { eventName, data } = job.data - const observers = this.cronHandlers_.get(eventName) || [] - this.logger_.info(`Processing cron job: ${eventName}`) - - return await Promise.all( - observers.map(async (subscriber) => { - return subscriber(data, eventName).catch((err) => { - this.logger_.warn( - `An error occured while processing ${eventName}: ${err}` - ) - return err - }) - }) - ) - } - /** * Registers a cron job. + * @deprecated All cron job logic has been refactored to the `JobSchedulerService`. This method will be removed in a future release. * @param eventName - the name of the event * @param data - the data to be sent with the event * @param cron - the cron pattern @@ -326,14 +280,6 @@ export default class EventBusService { cron: string, handler: Subscriber ): void { - this.logger_.info(`Registering ${eventName}`) - this.registerCronHandler_(eventName, handler) - return this.cronQueue_.add( - { - eventName, - data, - }, - { repeat: { cron } } - ) + this.jobSchedulerService_.create(eventName, data, cron, handler) } } diff --git a/packages/medusa/src/services/job-scheduler.ts b/packages/medusa/src/services/job-scheduler.ts new file mode 100644 index 0000000000..cc38c0fd23 --- /dev/null +++ b/packages/medusa/src/services/job-scheduler.ts @@ -0,0 +1,122 @@ +import Bull from "bull" +import Redis from "ioredis" +import { ConfigModule, Logger } from "../types/global" + +type InjectedDependencies = { + logger: Logger + redisClient: Redis.Redis + redisSubscriber: Redis.Redis +} + +type ScheduledJobHandler = ( + data: T, + eventName: string +) => Promise + +export default class JobSchedulerService { + protected readonly config_: ConfigModule + protected readonly logger_: Logger + protected readonly handlers_: Map = + new Map() + protected readonly queue_: Bull + + constructor( + { logger, redisClient, redisSubscriber }: InjectedDependencies, + config: ConfigModule, + singleton = true + ) { + this.config_ = config + this.logger_ = logger + + if (singleton) { + const opts = { + createClient: (type: string): Redis.Redis => { + switch (type) { + case "client": + return redisClient + case "subscriber": + return redisSubscriber + default: + if (config.projectConfig.redis_url) { + return new Redis(config.projectConfig.redis_url) + } + return redisClient + } + }, + } + + this.queue_ = new Bull(`scheduled-jobs:queue`, opts) + // Register scheduled job worker + this.queue_.process(this.scheduledJobsWorker) + } + } + + /** + * Adds a function to a list of event subscribers. + * @param event - the event that the subscriber will listen for. + * @param subscriber - the function to be called when a certain event + * happens. Subscribers must return a Promise. + * @return this + */ + protected registerHandler( + event: string | symbol, + handler: ScheduledJobHandler + ): void { + if (typeof handler !== "function") { + throw new Error("Handler must be a function") + } + + const handlers = this.handlers_.get(event) ?? [] + this.handlers_.set(event, [...handlers, handler]) + } + + /** + * Handles incoming scheduled jobs. + * @param job The job object + * @return resolves to the results of the subscriber calls. + */ + protected scheduledJobsWorker = async (job: { + data: { eventName: string; data: T } + }): Promise => { + const { eventName, data } = job.data + const observers = this.handlers_.get(eventName) || [] + this.logger_.info(`Processing scheduled job: ${eventName}`) + + return await Promise.all( + observers.map(async (subscriber) => { + return subscriber(data, eventName).catch((err) => { + this.logger_.warn( + `An error occured while processing ${eventName}: ${err}` + ) + return err + }) + }) + ) + } + + /** + * Registers a scheduled job. + * @param eventName - the name of the event + * @param data - the data to be sent with the event + * @param schedule - the schedule expression + * @param handler - the handler to call on the job + * @return void + */ + create( + eventName: string, + data: T, + schedule: string, + handler: ScheduledJobHandler + ): void { + this.logger_.info(`Registering ${eventName}`) + this.registerHandler(eventName, handler) + + this.queue_.add( + { + eventName, + data, + }, + { repeat: { cron: schedule } } + ) + } +} diff --git a/www/docs/sidebars.js b/www/docs/sidebars.js index 317114cc31..be8872f617 100644 --- a/www/docs/sidebars.js +++ b/www/docs/sidebars.js @@ -335,8 +335,8 @@ module.exports = { }, { type: "doc", - id: "advanced/backend/cron-jobs/create", - label: "Create a Cron Job" + id: "advanced/backend/scheduled-jobs/create", + label: "Create a Scheduled Job" }, { type: "doc",