feat(medusa): Reliable retrying of jobs (#2947)

This commit is contained in:
Oliver Windall Juhl
2023-01-09 11:56:29 +01:00
committed by GitHub
parent e4af968531
commit 32b038fc3f
9 changed files with 480 additions and 69 deletions

View File

@@ -0,0 +1,5 @@
---
"@medusajs/medusa": patch
---
feat(medusa): "Idempotent" retries of job subscribers

View File

@@ -0,0 +1,223 @@
# Events Architecture
In this document, you'll learn how the events system is implemented in Medusa.
## Overview
The events system in Medusa is built on a publish/subscribe architecture. The Medusa core publishes events when certain actions take place.
Those events can be subscribed to using subscribers. When you subscribe to an event, you can perform a task asynchronusly every time the event is triggered.
:::info
You can learn more about subscribers and their use cases in the [Subscribers](../subscribers/overview.md) documentation.
:::
---
## Publishing and Subscribing
The `EventBusService` is responsible for publishing and processing events.
:::note
The current implementation of the `EventBusService` is powered by Redis. However, an upcoming version of Medusa introduces an event bus module. This will allow you to use any publishing and subscribing provider. That will not change the general purpose and flow of the `EventBusService`.
:::
The `EventBusService` exposes two methods in its public API for event processing; `emit` and `subscribe`.
### emit
The `emit` method accepts as a first parameter the event name. It adds it to a Bull queue (powered by Redis) as a job, and processes it asynchronously.
The second parameter contains any data that should be emitted with the event. Subscribers that handle the event will receive that data as a method parameter.
The third parameter is an options object. It accepts options related to the number of retries if a subscriber handling the event fails, the delay time, and more. The options are explained in a [later section](#retrying-handlers)
The `emit` method has the following signature:
```ts
export default class EventBusService {
// ...
async emit<T>(
eventName: string,
data: T,
options: Record<string, unknown> & EmitOptions = { attempts: 1 }
): Promise<StagedJob | void>
}
```
Here's an example of how you can emit an event using the `EventBusService`:
```ts
eventBusService.emit(
"product.created",
{ id: "prod_..." },
{ attempts: 2 }
)
```
The `EventBusService` emits the event `product.created` by passing the event name as a first argument. An object is passed as a second argument which is the data passed to the event handler methods in subscribers. This object contains the ID of the product.
Options are passed in the third argument. The `attempt` property specifies how many times the subscriber should be retried if it fails (by default it's one).
### subscribe
The `subscribe` method will attach a handler method to the specified event, which is run when the event is triggered. It is usually used insde a subscriber class.
The `subscribe` method accepts the event name as the first parameter. This is the event that the handler method will attach to.
The second parameter is the handler method that will be triggered when the event is emitted.
The third parameter is an optional `context` parameter. It allows you to configure the ID of the handler method.
The `subscribe` method has the following signature:
```ts
export default class EventBusService {
// ...
subscribe(
event: string | symbol,
subscriber: Subscriber,
context?: SubscriberContext
): this
}
```
Here's an example of how you can subscribe to an event using the `EventBusService`:
```ts title=src/subscribers/my.ts
import { EventBusService } from "@medusajs/medusa"
class MySubscriber {
constructor({
eventBusService: EventBusService,
}) {
eventBusService.subscribe("product.created", (data) => {
// TODO handle event
console.log(data.id)
})
}
}
```
In the constructor of a subscriber, you use the `EventBusService` to subscribe to the event `product.created`. In the handler method, you can perform a task every time the product is created. Notice how the handler method accepts the `data` as a parameter as explain in the previous section.
:::note
You can learn more about how to create a subscriber in [this documentation](../subscribers/create-subscriber.md)
:::
---
## Processing Events
In the `EventBusService` service, the `worker_` method defines the logic run for each event emitted into the queue.
By default, all handler methods to that event are retrieved and, for each of the them, the stored data provided as a second parameter in `emit` is passed as an argument.
---
## Retrying Handlers
A handler method might fail to process an event. This could happen because it communicates with a third party service currently down or due to an error in its logic.
In some cases, you might want to retry those failed handlers.
As briefly explained earlier, you can pass options when emitting an event as a third argument that are used to configure how the queue worker processes your job. If you pass `attempts` upon emitting the event, the processing of a handler method is retried when it fails.
Aside from `attempts`, there are other options to futher configure the retry mechanism:
```ts
type EmitOptions = {
delay?: number
attempts: number
backoff?: {
type: "fixed" | "exponential"
delay: number
}
}
```
Here's what each of these options mean:
- `delay`: delay the triggering of the handler methods by a number of milliseconds.
- `attempts`: the number of times a subscriber handler should be retried when it fails.
- `backoff`: the wait time between each retry
### Note on Subscriber IDs
If you have more than one handler methods attached to a single event, or if you have multiple server instances running, you must pass a subscriber ID as a third parameter to the `subscribe` method. This allows the `EventBusService` to differentiate between handler methods when retrying a failed one.
If a subscriber ID is not passed on subscription, all handler methods are run again. This can lead to data inconsistencies or general unwanted behavior in your system.
On the other hand, if you want all handler methods to run again when one of them fails, you can omit passing a subscriber ID.
An example of passing a subscriber ID:
```ts title=src/subscribers/my.ts
import { EventBusService } from "@medusajs/medusa"
class MySubscriber {
constructor({
eventBusService: EventBusService,
}) {
eventBusService.subscribe(
"product.created",
(data) => {
// TODO handle event
console.log(data.id)
},
"my-unique-subscriber")
}
}
```
:::info
You can learn more about subscriber IDs in [Bull's documentation](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueadd).
:::
---
## Database transactions
<!-- vale docs.Acronyms = NO -->
Transactions in Medusa ensure atomicity, consistency, isolation, and durability, or ACID, guarantees for operations in the Medusa core.
<!-- vale docs.Acronyms = YES -->
In many cases, [services](../services/overview.md) typically update resources in the database and emit an event within a transactional operation. To ensure that these events don't cause data inconsistencies (for example, a plugin subscribes to an event to contact a third-party service, but the transaction fails) the concept of a staged job is introduced.
Instead of events being processed immediately, they're stored in the database as a staged job until they're ready. In other words, until the transaction has succeeded.
This rather complex logic is abstracted away from the consumers of the `EventBusService`, but here's an example of the flow when an API request is made:
1. API request starts.
2. Transaction is initiated.
3. Service layer performs some logic.
4. Events are emitted and stored in the database for eventual processing.
5. Transaction is committed.
6. API request ends.
7. Events in the database become visible.
To pull staged jobs from the database, a separate enqueuer polls the database every three seconds to discover new visible jobs. These jobs are then added to the queue and processed as described in the [Processing](#processing-events) section earlier.
:::info
This pattern is heavily inspired by the [Transactionally-staged Job Drain described in this blog post](https://brandur.org/job-drain).
:::
---
## See Also
- [Events reference](../subscribers/events-list.md)
- [Subscribers overview](../subscribers/overview.md)
- [How to create a subscriber](../subscribers/create-subscriber.md)

View File

@@ -2902,5 +2902,6 @@ Object of the following format:
## See Also
- [Events architecture overview](../events/architecture.md)
- [Use services in subscribers](create-subscriber.md#using-services-in-subscribers)
- [Create a notification provider](../notification/overview.md)

View File

@@ -27,4 +27,5 @@ Whenever an event is emitted, the subscribers registered handler method is ex
## See Also
- [Create a Subscriber](create-subscriber.md).
- [Events architecture overview](../events/architecture.md)
- [Events reference](events-list.md).

View File

@@ -0,0 +1,15 @@
import { MigrationInterface, QueryRunner } from "typeorm"
export class stagedJobOptions1673003729870 implements MigrationInterface {
name = "stagedJobOptions1673003729870"
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "staged_job" ADD "options" jsonb NOT NULL DEFAULT '{}'::JSONB`
)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`ALTER TABLE "staged_job" DROP COLUMN "options"`)
}
}

View File

@@ -14,6 +14,9 @@ export class StagedJob {
@DbAwareColumn({ type: "jsonb" })
data: Record<string, unknown>
@DbAwareColumn({ type: "jsonb", default: {} })
options: Record<string, unknown>
@BeforeInsert()
private beforeInsert(): void {
this.id = generateEntityId(this.id, "job")

View File

@@ -45,27 +45,44 @@ describe("EventBusService", () => {
describe("subscribe", () => {
let eventBus
describe("successfully adds subscriber", () => {
beforeAll(() => {
jest.resetAllMocks()
const stagedJobRepository = MockRepository({
find: () => Promise.resolve([]),
})
eventBus = new EventBusService({
manager: MockManager,
stagedJobRepository,
logger: loggerMock,
})
eventBus.subscribe("eventName", () => "test")
beforeEach(() => {
jest.resetAllMocks()
eventBus = new EventBusService({
manager: MockManager,
logger: loggerMock,
})
afterAll(async () => {
await eventBus.stopEnqueuer()
})
afterAll(async () => {
await eventBus.stopEnqueuer()
})
it("throws when subscriber already exists", async () => {
expect.assertions(1)
eventBus.subscribe("eventName", () => "test", {
subscriberId: "my-subscriber",
})
it("added the subscriber to the queue", () => {
expect(eventBus.observers_.get("eventName").length).toEqual(1)
try {
eventBus.subscribe("eventName", () => "new", {
subscriberId: "my-subscriber",
})
} catch (error) {
expect(error.message).toBe(
"Subscriber with id my-subscriber already exists"
)
}
})
it("successfully adds subscriber", () => {
eventBus.subscribe("eventName", () => "test", {
subscriberId: "my-subscriber",
})
expect(eventBus.eventToSubscribersMap_.get("eventName").length).toEqual(1)
})
describe("fails when adding non-function subscriber", () => {
@@ -169,15 +186,12 @@ describe("EventBusService", () => {
let eventBus
beforeAll(async () => {
jest.resetAllMocks()
const stagedJobRepository = MockRepository({
find: () => Promise.resolve([]),
})
eventBus = new EventBusService({
manager: MockManager,
stagedJobRepository,
logger: loggerMock,
})
eventBus.subscribe("eventName", () => Promise.resolve("hi"))
eventBus.subscribe("eventName", () => Promise.resolve("hi2"))
eventBus.subscribe("eventName", () => Promise.resolve("hi3"))
@@ -187,14 +201,17 @@ describe("EventBusService", () => {
result = await eventBus.worker_({
data: { eventName: "eventName", data: {} },
update: (data) => data,
opts: { attempts: 1 },
})
})
afterAll(async () => {
await eventBus.stopEnqueuer()
})
it("calls logger warn on rejections", () => {
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledTimes(4)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
)
@@ -206,8 +223,10 @@ describe("EventBusService", () => {
)
})
it("returns result from all subscribers", async () => {
expect(result.length).toEqual(6)
it("calls logger warn from retry not kicking in", () => {
expect(loggerMock.warn).toHaveBeenCalledWith(
"One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events."
)
})
})
})

View File

@@ -1,6 +1,8 @@
import Bull from "bull"
import Redis from "ioredis"
import { isDefined } from "medusa-core-utils"
import { EntityManager } from "typeorm"
import { ulid } from "ulid"
import { StagedJob } from "../models"
import { StagedJobRepository } from "../repositories/staged-job"
import { ConfigModule, Logger } from "../types/global"
@@ -18,9 +20,29 @@ type InjectedDependencies = {
type Subscriber<T = unknown> = (data: T, eventName: string) => Promise<void>
type SubscriberContext = {
subscriberId: string
}
type BullJob<T> = {
update: (data: unknown) => void
attemptsMade: number
opts: EmitOptions
data: {
eventName: string
data: T
completedSubscriberIds: string[] | undefined
}
}
type SubscriberDescriptor = {
id: string
subscriber: Subscriber
}
type EmitOptions = {
delay?: number
attempts?: number
attempts: number
backoff?: {
type: "fixed" | "exponential"
delay: number
@@ -37,7 +59,10 @@ export default class EventBusService {
protected readonly logger_: Logger
protected readonly stagedJobRepository_: typeof StagedJobRepository
protected readonly jobSchedulerService_: JobSchedulerService
protected readonly observers_: Map<string | symbol, Subscriber[]>
protected readonly eventToSubscribersMap_: Map<
string | symbol,
SubscriberDescriptor[]
>
protected readonly redisClient_: Redis.Redis
protected readonly redisSubscriber_: Redis.Redis
protected queue_: Bull
@@ -80,7 +105,7 @@ export default class EventBusService {
},
}
this.observers_ = new Map()
this.eventToSubscribersMap_ = new Map()
this.queue_ = new Bull(`${this.constructor.name}:queue`, opts)
this.redisClient_ = redisClient
this.redisSubscriber_ = redisSubscriber
@@ -121,16 +146,44 @@ export default class EventBusService {
* Adds a function to a list of event subscribers.
* @param event - the event that the subscriber will listen for.
* @param subscriber - the function to be called when a certain event
* @param context - context to use when attaching subscriber
* happens. Subscribers must return a Promise.
* @return this
*/
subscribe(event: string | symbol, subscriber: Subscriber): this {
subscribe(
event: string | symbol,
subscriber: Subscriber,
context?: SubscriberContext
): this {
if (typeof subscriber !== "function") {
throw new Error("Subscriber must be a function")
}
const observers = this.observers_.get(event) ?? []
this.observers_.set(event, [...observers, subscriber])
/**
* If context is provided, we use the subscriberId from it
* otherwise we generate a random using a ulid
*/
const subscriberId =
context?.subscriberId ?? `${event.toString()}-${ulid()}`
const newSubscriberDescriptor = { subscriber, id: subscriberId }
const existingSubscribers = this.eventToSubscribersMap_.get(event) ?? []
const subscriberAlreadyExists = existingSubscribers.find(
(sub) => sub.id === subscriberId
)
console.log(subscriberId)
if (subscriberAlreadyExists) {
throw Error(`Subscriber with id ${subscriberId} already exists`)
}
this.eventToSubscribersMap_.set(event, [
...existingSubscribers,
newSubscriberDescriptor,
])
return this
}
@@ -147,10 +200,15 @@ export default class EventBusService {
throw new Error("Subscriber must be a function")
}
if (this.observers_.get(event)?.length) {
const index = this.observers_.get(event)?.indexOf(subscriber)
if (index !== -1) {
this.observers_.get(event)?.splice(index as number, 1)
const existingSubscribers = this.eventToSubscribersMap_.get(event)
if (existingSubscribers?.length) {
const subIndex = existingSubscribers?.findIndex(
(sub) => sub.subscriber === subscriber
)
if (subIndex !== -1) {
this.eventToSubscribersMap_.get(event)?.splice(subIndex as number, 1)
}
}
@@ -167,33 +225,48 @@ export default class EventBusService {
async emit<T>(
eventName: string,
data: T,
options: EmitOptions = {}
options: Record<string, unknown> & EmitOptions = { attempts: 1 }
): Promise<StagedJob | void> {
const opts: { removeOnComplete: boolean } & EmitOptions = {
removeOnComplete: true,
attempts: 1,
}
if (typeof options.attempts === "number") {
opts.attempts = options.attempts
if (isDefined(options.backoff)) {
opts.backoff = options.backoff
}
}
if (typeof options.delay === "number") {
opts.delay = options.delay
}
/**
* If we are in an ongoing transaction, we store the jobs in the database
* instead of processing them immediately. We only want to process those
* events, if the transaction successfully commits. This is to avoid jobs
* being processed if the transaction fails.
*
* In case of a failing transaction, kobs stored in the database are removed
* as part of the rollback.
*/
if (this.transactionManager_) {
const stagedJobRepository = this.transactionManager_.getCustomRepository(
this.stagedJobRepository_
)
const stagedJobInstance = stagedJobRepository.create({
const jobToCreate = {
event_name: eventName,
data,
} as StagedJob)
data: data as unknown as Record<string, unknown>,
options: opts,
} as Partial<StagedJob>
const stagedJobInstance = stagedJobRepository.create(jobToCreate)
return await stagedJobRepository.save(stagedJobInstance)
} else {
const opts: { removeOnComplete: boolean } & EmitOptions = {
removeOnComplete: true,
}
if (typeof options.attempts === "number") {
opts.attempts = options.attempts
if (typeof options.backoff !== "undefined") {
opts.backoff = options.backoff
}
}
if (typeof options.delay === "number") {
opts.delay = options.delay
}
this.queue_.add({ eventName, data }, opts)
}
this.queue_.add({ eventName, data }, opts)
}
startEnqueuer(): void {
@@ -224,7 +297,7 @@ export default class EventBusService {
this.queue_
.add(
{ eventName: job.event_name, data: job.data },
{ removeOnComplete: true }
job.options ?? { removeOnComplete: true }
)
.then(async () => {
await stagedJobRepo.remove(job)
@@ -241,30 +314,96 @@ export default class EventBusService {
* @param job The job object
* @return resolves to the results of the subscriber calls.
*/
worker_ = async <T>(job: {
data: { eventName: string; data: T }
}): Promise<unknown[]> => {
worker_ = async <T>(job: BullJob<T>): Promise<unknown> => {
const { eventName, data } = job.data
const eventObservers = this.observers_.get(eventName) || []
const wildcardObservers = this.observers_.get("*") || []
const eventSubscribers = this.eventToSubscribersMap_.get(eventName) || []
const wildcardSubscribers = this.eventToSubscribersMap_.get("*") || []
const observers = eventObservers.concat(wildcardObservers)
const allSubscribers = eventSubscribers.concat(wildcardSubscribers)
this.logger_.info(
`Processing ${eventName} which has ${eventObservers.length} subscribers`
// Pull already completed subscribers from the job data
const completedSubscribers = job.data.completedSubscriberIds || []
// Filter out already completed subscribers from the all subscribers
const subscribersInCurrentAttempt = allSubscribers.filter(
(subscriber) =>
subscriber.id && !completedSubscribers.includes(subscriber.id)
)
return await Promise.all(
observers.map(async (subscriber) => {
return subscriber(data, eventName).catch((err) => {
this.logger_.warn(
`An error occurred while processing ${eventName}: ${err}`
)
console.error(err)
return err
})
const isRetry = job.attemptsMade > 0
const currentAttempt = job.attemptsMade + 1
const isFinalAttempt = job?.opts?.attempts === currentAttempt
if (isRetry) {
if (isFinalAttempt) {
this.logger_.info(`Final retry attempt for ${eventName}`)
}
this.logger_.info(
`Retrying ${eventName} which has ${eventSubscribers.length} subscribers (${subscribersInCurrentAttempt.length} of them failed)`
)
} else {
this.logger_.info(
`Processing ${eventName} which has ${eventSubscribers.length} subscribers`
)
}
const completedSubscribersInCurrentAttempt: string[] = []
const subscribersResult = await Promise.all(
subscribersInCurrentAttempt.map(async ({ id, subscriber }) => {
return subscriber(data, eventName)
.then((data) => {
// For every subscriber that completes successfully, add their id to the list of completed subscribers
completedSubscribersInCurrentAttempt.push(id)
return data
})
.catch((err) => {
this.logger_.warn(
`An error occurred while processing ${eventName}: ${err}`
)
return err
})
})
)
// If the number of completed subscribers is different from the number of subcribers to process in current attempt, some of them failed
const didSubscribersFail =
completedSubscribersInCurrentAttempt.length !==
subscribersInCurrentAttempt.length
const isRetriesConfigured = job?.opts?.attempts > 1
// Therefore, if retrying is configured, we try again
const shouldRetry =
didSubscribersFail && isRetriesConfigured && !isFinalAttempt
if (shouldRetry) {
const updatedCompletedSubscribers = [
...completedSubscribers,
...completedSubscribersInCurrentAttempt,
]
job.data.completedSubscriberIds = updatedCompletedSubscribers
job.update(job.data)
const errorMessage = `One or more subscribers of ${eventName} failed. Retrying...`
this.logger_.warn(errorMessage)
return Promise.reject(Error(errorMessage))
}
if (didSubscribersFail && !isFinalAttempt) {
// If retrying is not configured, we log a warning to allow server admins to recover manually
this.logger_.warn(
`One or more subscribers of ${eventName} failed. Retrying is not configured. Use 'attempts' option when emitting events.`
)
}
return Promise.resolve(subscribersResult)
}
/**

View File

@@ -430,6 +430,11 @@ module.exports = {
id: "advanced/backend/subscribers/overview",
label: "Subscribers"
},
{
type: "doc",
id: "advanced/backend/events/architecture",
label: "Events Architecture"
},
{
type: "doc",
id: "advanced/backend/shipping/overview",