feat(medusa): Extract cron job logic to its own service (#2821)

**What**
Extract cron job logic from the `EventBusService` to its own service `JobSchedulerService`

**Why**
Preliminary step to extracting the event bus into a module

**Tests**
Tested with a local installation of Medusa

Resolved CORE-918
This commit is contained in:
Oliver Windall Juhl
2022-12-19 20:04:46 +01:00
committed by GitHub
parent c8724da503
commit ba6bb3e54b
9 changed files with 385 additions and 205 deletions

View File

@@ -0,0 +1,6 @@
---
"medusa-plugin-brightpearl": patch
"@medusajs/medusa": patch
---
feat(medusa): Extract cron jobs logic from the EventBusService to its own service JobSchedulerService

View File

@@ -1,128 +0,0 @@
# How to Create a Cron Job
In this document, youll learn how to create a cron job in Medusa.
## Overview
Medusa allows you to create cron jobs that run at specific times during your servers lifetime. For example, you can synchronize your inventory with an Enterprise Resource Planning (ERP) system once a day.
This guide explains how to create a cron job on your Medusa server. The cron job in this example will simply change the status of draft products to `published`.
## Prerequisites
### Medusa Components
It is assumed that you already have a Medusa server installed and set up. If not, you can follow the [quickstart guide](../../../quickstart/quick-start.md) to get started.
### Redis
Redis is required for cron jobs to work. Make sure you [install Redis](../../../tutorial/0-set-up-your-development-environment.mdx#redis) and [configure it with your Medusa server](../../../usage/configurations.md#redis).
## 1. Create a File
Each cron job should reside in a TypeScript or JavaScript file under the `src/loaders` directory.
Start by creating the `src/loaders` directory. Then, inside that directory, create the JavaScript or TypeScript file that youll add the cron job in. You can use any name for the file.
For the example in this tutorial, you can create the file `src/loaders/publish.ts`.
## 2. Create Cron Job
To create a cron job, add the following code in the file you created, which is `src/loaders/publish.ts` in this example:
```ts title=src/loaders/publish.ts
const publishJob = async (container, options) => {
const eventBus = container.resolve("eventBusService");
eventBus.createCronJob("publish-products", {}, "0 0 * * *", async () => {
//job to execute
const productService = container.resolve("productService");
const draftProducts = await productService.list({
status: 'draft'
});
for (const product of draftProducts) {
await productService.update(product.id, {
status: 'published'
});
}
})
}
export default publishJob;
```
This file should export a function that accepts a `container` and `options` parameters. `container` is the dependency container that you can use to resolve services, such as the [EventBusService](../../../references/services/classes/EventBusService.md). `options` are the plugins options if this cron job is created in a plugin.
You then resolve the `EventBusService` and use the `eventBus.createCronJob` method to create the cron job. This method accepts four parameters:
- The first parameter is a unique name to give to the cron job. In the example above, you use the name `publish-products`;
- The second parameter is an object which can be used to [pass data to the job](#pass-data-to-the-cron-job);
- The third parameter is the cron job expression pattern. In this example, it will execute the cron job once a day at 12 AM.
- The fourth parameter is the function to execute. This is where you add the code to execute once the cron job runs. In this example, you retrieve the draft products using the [ProductService](../../../references/services/classes/ProductService.md) and update the status of each of these products to `published`.
:::tip
You can see examples of cron job expression patterns on [crontab guru](https://crontab.guru/examples.html).
:::
### Pass Data to the Cron Job
To pass data to your cron job, you can add them to the object passed as a second parameter under the `data` property. This is helpful if you use one function to handle multiple cron jobs.
For example:
```ts
eventBus.createCronJob("publish-products", {
data: {
productId
}
}, "0 0 * * *", async (job) => {
console.log(job.data); // {productId: 'prod_124...'}
//...
});
```
## 3. Run Medusa Server
:::info
Cron Jobs only run while the Medusa server is running.
:::
In your terminal run the following command to run your Medusa server:
```bash npm2yarn
npm run start
```
This builds your code under the `src` directory into the `dist` directory, then runs the Medusa server.
If the cron job was registered successfully, you should see a message similar to this logged on your Medusa server:
```bash
Registering publish-products
```
Where `publish-products` is the unique name you provided to the cron job.
Once it is time to run your cron job based on the cron job expression pattern, the cron job will run and you can see it logged on your Medusa server.
For example, the above cron job will run at 12 AM and, when it runs, you can see the following logged on your Medusa server:
```bash noReport
info: Processing cron job: publish-products
```
If you log anything in the cron job, for example using `console.log`, or if any errors are thrown, itll also be logged on your Medusa server.
:::tip
To test the previous example out instantly, you can change the cron job expression pattern passed as the third parameter to `eventBus.createCronJob` to `* * * * *`. This will run the cron job every minute.
:::
## Whats Next
- Learn more about [services and how you can use them](../services/overview.md).

View File

@@ -0,0 +1,134 @@
# How to Create a Scheduled Job
In this document, youll learn how to create a scheduled job in Medusa.
## Overview
Medusa allows you to create scheduled jobs that run at specific times during your servers lifetime. For example, you can synchronize your inventory with an Enterprise Resource Planning (ERP) system once a day.
This guide explains how to create a scheduled job on your Medusa server. The scheduled job in this example will simply change the status of draft products to `published`.
## Prerequisites
### Medusa Components
It is assumed that you already have a Medusa server installed and set up. If not, you can follow the [quickstart guide](../../../quickstart/quick-start.md) to get started.
### Redis
Redis is required for scheduled jobs to work. Make sure you [install Redis](../../../tutorial/0-set-up-your-development-environment.mdx#redis) and [configure it with your Medusa server](../../../usage/configurations.md#redis).
## 1. Create a File
Each scheduled job should reside in a TypeScript or JavaScript file under the `src/loaders` directory.
Start by creating the `src/loaders` directory. Then, inside that directory, create the JavaScript or TypeScript file that youll add the scheduled job in. You can use any name for the file.
For the example in this tutorial, you can create the file `src/loaders/publish.ts`.
## 2. Create Cron Job
To create a scheduled job, add the following code in the file you created, which is `src/loaders/publish.ts` in this example:
```ts title=src/loaders/publish.ts
const publishJob = async (container, options) => {
const jobSchedulerService = container.resolve("jobSchedulerService");
jobSchedulerService.create("publish-products", {}, "0 0 * * *", async () => {
//job to execute
const productService = container.resolve("productService");
const draftProducts = await productService.list({
status: 'draft'
});
for (const product of draftProducts) {
await productService.update(product.id, {
status: 'published'
});
}
})
}
export default publishJob;
```
:::info
The service taking care of background jobs was renamed in v1.7.1. If you are running a previous version, use `eventBusService` instead of `jobSchedulerService`.
:::
This file should export a function that accepts a `container` and `options` parameters. `container` is the dependency container that you can use to resolve services, such as the JobSchedulerService. `options` are the plugins options if this scheduled job is created in a plugin.
You then resolve the `JobSchedulerService` and use the `jobSchedulerService.create` method to create the scheduled job. This method accepts four parameters:
- The first parameter is a unique name to give to the scheduled job. In the example above, you use the name `publish-products`;
- The second parameter is an object which can be used to [pass data to the job](#pass-data-to-the-scheduled-job);
- The third parameter is the scheduled job expression pattern. In this example, it will execute the scheduled job once a day at 12 AM.
- The fourth parameter is the function to execute. This is where you add the code to execute once the scheduled job runs. In this example, you retrieve the draft products using the [ProductService](../../../references/services/classes/ProductService.md) and update the status of each of these products to `published`.
:::tip
You can see examples of scheduled job expression patterns on [crontab guru](https://crontab.guru/examples.html).
:::
### Pass Data to the Cron Job
To pass data to your scheduled job, you can add them to the object passed as a second parameter under the `data` property. This is helpful if you use one function to handle multiple scheduled jobs.
For example:
```ts
jobSchedulerService.create("publish-products", {
data: {
productId
}
}, "0 0 * * *", async (job) => {
console.log(job.data); // {productId: 'prod_124...'}
//...
});
```
## 3. Run Medusa Server
:::info
Cron Jobs only run while the Medusa server is running.
:::
In your terminal run the following command to run your Medusa server:
```bash npm2yarn
npm run start
```
This builds your code under the `src` directory into the `dist` directory, then runs the Medusa server.
If the scheduled job was registered successfully, you should see a message similar to this logged on your Medusa server:
```bash
Registering publish-products
```
Where `publish-products` is the unique name you provided to the scheduled job.
Once it is time to run your scheduled job based on the scheduled job expression pattern, the scheduled job will run and you can see it logged on your Medusa server.
For example, the above scheduled job will run at 12 AM and, when it runs, you can see the following logged on your Medusa server:
```bash noReport
info: Processing scheduled job: publish-products
```
If you log anything in the scheduled job, for example using `console.log`, or if any errors are thrown, itll also be logged on your Medusa server.
:::tip
To test the previous example out instantly, you can change the scheduled job expression pattern passed as the third parameter to `jobSchedulerService.create` to `* * * * *`. This will run the scheduled job every minute.
:::
## Whats Next
- Learn more about [services and how you can use them](../services/overview.md).

View File

@@ -3,10 +3,10 @@ const inventorySync = async (container, options) => {
return return
} else { } else {
const brightpearlService = container.resolve("brightpearlService") const brightpearlService = container.resolve("brightpearlService")
const eventBus = container.resolve("eventBusService") const jobSchedulerService = container.resolve("jobSchedulerService")
try { try {
const pattern = options.inventory_sync_cron const pattern = options.inventory_sync_cron
eventBus.createCronJob("inventory-sync", {}, pattern, () => jobSchedulerService.create("inventory-sync", {}, pattern, () =>
brightpearlService.syncInventory() brightpearlService.syncInventory()
) )
} catch (err) { } catch (err) {

View File

@@ -1,7 +1,7 @@
import Bull from "bull" import Bull from "bull"
import { MockRepository, MockManager } from "medusa-test-utils" import { MockManager, MockRepository } from "medusa-test-utils"
import EventBusService from "../event-bus"
import config from "../../loaders/config" import config from "../../loaders/config"
import EventBusService from "../event-bus"
jest.genMockFromModule("bull") jest.genMockFromModule("bull")
jest.mock("bull") jest.mock("bull")
@@ -36,7 +36,7 @@ describe("EventBusService", () => {
}) })
it("creates bull queue", () => { it("creates bull queue", () => {
expect(Bull).toHaveBeenCalledTimes(2) expect(Bull).toHaveBeenCalledTimes(1)
expect(Bull).toHaveBeenCalledWith("EventBusService:queue", { expect(Bull).toHaveBeenCalledWith("EventBusService:queue", {
createClient: expect.any(Function), createClient: expect.any(Function),
}) })
@@ -97,7 +97,8 @@ describe("EventBusService", () => {
}) })
describe("emit", () => { describe("emit", () => {
let eventBus, job let eventBus
let job
describe("successfully adds job to queue", () => { describe("successfully adds job to queue", () => {
beforeAll(() => { beforeAll(() => {
jest.resetAllMocks() jest.resetAllMocks()
@@ -126,7 +127,8 @@ describe("EventBusService", () => {
}) })
describe("worker", () => { describe("worker", () => {
let eventBus, result let eventBus
let result
describe("successfully runs the worker", () => { describe("successfully runs the worker", () => {
beforeAll(async () => { beforeAll(async () => {
jest.resetAllMocks() jest.resetAllMocks()
@@ -134,11 +136,14 @@ describe("EventBusService", () => {
find: () => Promise.resolve([]), find: () => Promise.resolve([]),
}) })
eventBus = new EventBusService({ eventBus = new EventBusService(
manager: MockManager, {
stagedJobRepository, manager: MockManager,
logger: loggerMock, stagedJobRepository,
}, {}) logger: loggerMock,
},
{}
)
eventBus.subscribe("eventName", () => Promise.resolve("hi")) eventBus.subscribe("eventName", () => Promise.resolve("hi"))
result = await eventBus.worker_({ result = await eventBus.worker_({
data: { eventName: "eventName", data: {} }, data: { eventName: "eventName", data: {} },

View File

@@ -0,0 +1,95 @@
import Bull from "bull"
import config from "../../loaders/config"
import JobSchedulerService from "../job-scheduler"
jest.genMockFromModule("bull")
jest.mock("bull")
jest.mock("../../loaders/config")
config.redisURI = "testhost"
const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
}
describe("JobSchedulerService", () => {
describe("constructor", () => {
let jobScheduler
beforeAll(() => {
jest.resetAllMocks()
jobScheduler = new JobSchedulerService({
logger: loggerMock,
})
})
it("creates bull queue", () => {
expect(Bull).toHaveBeenCalledTimes(1)
expect(Bull).toHaveBeenCalledWith("scheduled-jobs:queue", {
createClient: expect.any(Function),
})
})
})
describe("create", () => {
let jobScheduler
describe("successfully creates scheduled job and add handler", () => {
beforeAll(() => {
jest.resetAllMocks()
jobScheduler = new JobSchedulerService({
logger: loggerMock,
})
jobScheduler.create(
"eventName",
{ data: "test" },
"* * * * *",
() => "test"
)
})
it("added the handler to the job queue", () => {
expect(jobScheduler.handlers_.get("eventName").length).toEqual(1)
})
})
describe("scheduledJobWorker", () => {
let jobScheduler
let result
describe("successfully runs the worker", () => {
beforeAll(async () => {
jest.resetAllMocks()
jobScheduler = new JobSchedulerService(
{
logger: loggerMock,
},
{}
)
jobScheduler.create("eventName", { data: "test" }, "* * * * *", () =>
Promise.resolve("hi")
)
result = await jobScheduler.scheduledJobsWorker({
data: { eventName: "eventName", data: {} },
})
})
it("calls logger", () => {
expect(loggerMock.info).toHaveBeenCalled()
expect(loggerMock.info).toHaveBeenCalledWith(
"Processing scheduled job: eventName"
)
})
it("returns array with hi", async () => {
expect(result).toEqual(["hi"])
})
})
})
})
})

View File

@@ -1,15 +1,17 @@
import Bull from "bull" import Bull from "bull"
import Redis from "ioredis" import Redis from "ioredis"
import { EntityManager } from "typeorm" import { EntityManager } from "typeorm"
import { ConfigModule, Logger } from "../types/global"
import { StagedJobRepository } from "../repositories/staged-job"
import { StagedJob } from "../models" import { StagedJob } from "../models"
import { StagedJobRepository } from "../repositories/staged-job"
import { ConfigModule, Logger } from "../types/global"
import { sleep } from "../utils/sleep" import { sleep } from "../utils/sleep"
import JobSchedulerService from "./job-scheduler"
type InjectedDependencies = { type InjectedDependencies = {
manager: EntityManager manager: EntityManager
logger: Logger logger: Logger
stagedJobRepository: typeof StagedJobRepository stagedJobRepository: typeof StagedJobRepository
jobSchedulerService: JobSchedulerService
redisClient: Redis.Redis redisClient: Redis.Redis
redisSubscriber: Redis.Redis redisSubscriber: Redis.Redis
} }
@@ -34,11 +36,10 @@ export default class EventBusService {
protected readonly manager_: EntityManager protected readonly manager_: EntityManager
protected readonly logger_: Logger protected readonly logger_: Logger
protected readonly stagedJobRepository_: typeof StagedJobRepository protected readonly stagedJobRepository_: typeof StagedJobRepository
protected readonly jobSchedulerService_: JobSchedulerService
protected readonly observers_: Map<string | symbol, Subscriber[]> protected readonly observers_: Map<string | symbol, Subscriber[]>
protected readonly cronHandlers_: Map<string | symbol, Subscriber[]>
protected readonly redisClient_: Redis.Redis protected readonly redisClient_: Redis.Redis
protected readonly redisSubscriber_: Redis.Redis protected readonly redisSubscriber_: Redis.Redis
protected readonly cronQueue_: Bull
protected queue_: Bull protected queue_: Bull
protected shouldEnqueuerRun: boolean protected shouldEnqueuerRun: boolean
protected transactionManager_: EntityManager | undefined protected transactionManager_: EntityManager | undefined
@@ -79,14 +80,10 @@ export default class EventBusService {
this.observers_ = new Map() this.observers_ = new Map()
this.queue_ = new Bull(`${this.constructor.name}:queue`, opts) this.queue_ = new Bull(`${this.constructor.name}:queue`, opts)
this.cronHandlers_ = new Map()
this.redisClient_ = redisClient this.redisClient_ = redisClient
this.redisSubscriber_ = redisSubscriber this.redisSubscriber_ = redisSubscriber
this.cronQueue_ = new Bull(`cron-jobs:queue`, opts)
// Register our worker to handle emit calls // Register our worker to handle emit calls
this.queue_.process(this.worker_) this.queue_.process(this.worker_)
// Register cron worker
this.cronQueue_.process(this.cronWorker_)
if (process.env.NODE_ENV !== "test") { if (process.env.NODE_ENV !== "test") {
this.startEnqueuer() this.startEnqueuer()
@@ -103,6 +100,7 @@ export default class EventBusService {
{ {
manager: transactionManager, manager: transactionManager,
stagedJobRepository: this.stagedJobRepository_, stagedJobRepository: this.stagedJobRepository_,
jobSchedulerService: this.jobSchedulerService_,
logger: this.logger_, logger: this.logger_,
redisClient: this.redisClient_, redisClient: this.redisClient_,
redisSubscriber: this.redisSubscriber_, redisSubscriber: this.redisSubscriber_,
@@ -157,27 +155,6 @@ export default class EventBusService {
return this return this
} }
/**
* Adds a function to a list of event subscribers.
* @param event - the event that the subscriber will listen for.
* @param subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
* @return this
*/
protected registerCronHandler_(
event: string | symbol,
subscriber: Subscriber
): this {
if (typeof subscriber !== "function") {
throw new Error("Handler must be a function")
}
const cronHandlers = this.cronHandlers_.get(event) ?? []
this.cronHandlers_.set(event, [...cronHandlers, subscriber])
return this
}
/** /**
* Calls all subscribers when an event occurs. * Calls all subscribers when an event occurs.
* @param {string} eventName - the name of the event to be process. * @param {string} eventName - the name of the event to be process.
@@ -198,7 +175,7 @@ export default class EventBusService {
const stagedJobInstance = stagedJobRepository.create({ const stagedJobInstance = stagedJobRepository.create({
event_name: eventName, event_name: eventName,
data, data,
}) } as StagedJob)
return await stagedJobRepository.save(stagedJobInstance) return await stagedJobRepository.save(stagedJobInstance)
} else { } else {
const opts: { removeOnComplete: boolean } & EmitOptions = { const opts: { removeOnComplete: boolean } & EmitOptions = {
@@ -288,32 +265,9 @@ export default class EventBusService {
) )
} }
/**
* Handles incoming jobs.
* @param job The job object
* @return resolves to the results of the subscriber calls.
*/
cronWorker_ = async <T>(job: {
data: { eventName: string; data: T }
}): Promise<unknown[]> => {
const { eventName, data } = job.data
const observers = this.cronHandlers_.get(eventName) || []
this.logger_.info(`Processing cron job: ${eventName}`)
return await Promise.all(
observers.map(async (subscriber) => {
return subscriber(data, eventName).catch((err) => {
this.logger_.warn(
`An error occured while processing ${eventName}: ${err}`
)
return err
})
})
)
}
/** /**
* Registers a cron job. * Registers a cron job.
* @deprecated All cron job logic has been refactored to the `JobSchedulerService`. This method will be removed in a future release.
* @param eventName - the name of the event * @param eventName - the name of the event
* @param data - the data to be sent with the event * @param data - the data to be sent with the event
* @param cron - the cron pattern * @param cron - the cron pattern
@@ -326,14 +280,6 @@ export default class EventBusService {
cron: string, cron: string,
handler: Subscriber handler: Subscriber
): void { ): void {
this.logger_.info(`Registering ${eventName}`) this.jobSchedulerService_.create(eventName, data, cron, handler)
this.registerCronHandler_(eventName, handler)
return this.cronQueue_.add(
{
eventName,
data,
},
{ repeat: { cron } }
)
} }
} }

View File

@@ -0,0 +1,122 @@
import Bull from "bull"
import Redis from "ioredis"
import { ConfigModule, Logger } from "../types/global"
type InjectedDependencies = {
logger: Logger
redisClient: Redis.Redis
redisSubscriber: Redis.Redis
}
type ScheduledJobHandler<T = unknown> = (
data: T,
eventName: string
) => Promise<void>
export default class JobSchedulerService {
protected readonly config_: ConfigModule
protected readonly logger_: Logger
protected readonly handlers_: Map<string | symbol, ScheduledJobHandler[]> =
new Map()
protected readonly queue_: Bull
constructor(
{ logger, redisClient, redisSubscriber }: InjectedDependencies,
config: ConfigModule,
singleton = true
) {
this.config_ = config
this.logger_ = logger
if (singleton) {
const opts = {
createClient: (type: string): Redis.Redis => {
switch (type) {
case "client":
return redisClient
case "subscriber":
return redisSubscriber
default:
if (config.projectConfig.redis_url) {
return new Redis(config.projectConfig.redis_url)
}
return redisClient
}
},
}
this.queue_ = new Bull(`scheduled-jobs:queue`, opts)
// Register scheduled job worker
this.queue_.process(this.scheduledJobsWorker)
}
}
/**
* Adds a function to a list of event subscribers.
* @param event - the event that the subscriber will listen for.
* @param subscriber - the function to be called when a certain event
* happens. Subscribers must return a Promise.
* @return this
*/
protected registerHandler(
event: string | symbol,
handler: ScheduledJobHandler
): void {
if (typeof handler !== "function") {
throw new Error("Handler must be a function")
}
const handlers = this.handlers_.get(event) ?? []
this.handlers_.set(event, [...handlers, handler])
}
/**
* Handles incoming scheduled jobs.
* @param job The job object
* @return resolves to the results of the subscriber calls.
*/
protected scheduledJobsWorker = async <T>(job: {
data: { eventName: string; data: T }
}): Promise<unknown[]> => {
const { eventName, data } = job.data
const observers = this.handlers_.get(eventName) || []
this.logger_.info(`Processing scheduled job: ${eventName}`)
return await Promise.all(
observers.map(async (subscriber) => {
return subscriber(data, eventName).catch((err) => {
this.logger_.warn(
`An error occured while processing ${eventName}: ${err}`
)
return err
})
})
)
}
/**
* Registers a scheduled job.
* @param eventName - the name of the event
* @param data - the data to be sent with the event
* @param schedule - the schedule expression
* @param handler - the handler to call on the job
* @return void
*/
create<T>(
eventName: string,
data: T,
schedule: string,
handler: ScheduledJobHandler
): void {
this.logger_.info(`Registering ${eventName}`)
this.registerHandler(eventName, handler)
this.queue_.add(
{
eventName,
data,
},
{ repeat: { cron: schedule } }
)
}
}

View File

@@ -335,8 +335,8 @@ module.exports = {
}, },
{ {
type: "doc", type: "doc",
id: "advanced/backend/cron-jobs/create", id: "advanced/backend/scheduled-jobs/create",
label: "Create a Cron Job" label: "Create a Scheduled Job"
}, },
{ {
type: "doc", type: "doc",