chore: ability to group events on redis event bus (#7655)

* chore: ability to group events on redis event bus

* chore: fix tests

* Update packages/modules/event-bus-redis/src/services/event-bus-redis.ts

Co-authored-by: Adrien de Peretti <adrien.deperetti@gmail.com>

* chore: change shape of input and body data

* chore: fix builds

* chore: address comments

* chore: fix unit test

---------

Co-authored-by: Adrien de Peretti <adrien.deperetti@gmail.com>
This commit is contained in:
Riqwan Thamir
2024-06-10 22:15:43 +02:00
committed by GitHub
parent 3b8160b564
commit 39ddba2491
24 changed files with 924 additions and 732 deletions

View File

@@ -1,318 +0,0 @@
import { Queue, Worker } from "bullmq"
import RedisEventBusService from "../event-bus-redis"
jest.genMockFromModule("bullmq")
jest.genMockFromModule("ioredis")
jest.mock("bullmq")
jest.mock("ioredis")
const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
}
const simpleModuleOptions = { redisUrl: "test-url" }
const moduleDeps = {
manager: {},
logger: loggerMock,
eventBusRedisConnection: {},
}
describe("RedisEventBusService", () => {
let eventBus
describe("constructor", () => {
beforeAll(() => {
jest.clearAllMocks()
})
it("Creates a queue + worker", () => {
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
resources: "shared",
})
expect(Queue).toHaveBeenCalledTimes(1)
expect(Queue).toHaveBeenCalledWith("events-queue", {
connection: expect.any(Object),
prefix: "RedisEventBusService",
})
expect(Worker).toHaveBeenCalledTimes(1)
expect(Worker).toHaveBeenCalledWith(
"events-queue",
expect.any(Function),
{
connection: expect.any(Object),
prefix: "RedisEventBusService",
}
)
})
it("Throws on isolated module declaration", () => {
try {
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
resources: "isolated",
})
} catch (error) {
expect(error.message).toEqual(
"At the moment this module can only be used with shared resources"
)
}
})
})
describe("emit", () => {
describe("Successfully emits events", () => {
beforeEach(() => {
jest.clearAllMocks()
})
it("Adds job to queue with default options", () => {
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
resources: "shared",
})
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
eventBus.emit("eventName", { hi: "1234" })
expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1)
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 1,
removeOnComplete: true,
},
},
])
})
it("Adds job to queue with custom options passed directly upon emitting", () => {
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
resources: "shared",
})
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
eventBus.emit(
"eventName",
{ hi: "1234" },
{ attempts: 3, backoff: 5000, delay: 1000 }
)
expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1)
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 3,
backoff: 5000,
delay: 1000,
removeOnComplete: true,
},
},
])
})
it("Adds job to queue with module job options", () => {
eventBus = new RedisEventBusService(
moduleDeps,
{
...simpleModuleOptions,
jobOptions: {
removeOnComplete: {
age: 5,
},
attempts: 7,
},
},
{
resources: "shared",
}
)
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
eventBus.emit("eventName", { hi: "1234" })
expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1)
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 7,
removeOnComplete: {
age: 5,
},
},
},
])
})
it("Adds job to queue with default, local, and global options merged", () => {
eventBus = new RedisEventBusService(
moduleDeps,
{
...simpleModuleOptions,
jobOptions: {
removeOnComplete: 5,
},
},
{
resources: "shared",
}
)
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
eventBus.emit("eventName", { hi: "1234" }, { delay: 1000 })
expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1)
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 1,
removeOnComplete: 5,
delay: 1000,
},
},
])
})
})
})
describe("worker_", () => {
let result
describe("Successfully processes the jobs", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
resources: "shared",
})
})
it("Processes a simple event with no options", async () => {
eventBus.subscribe("eventName", () => Promise.resolve("hi"))
result = await eventBus.worker_({
data: { eventName: "eventName", data: {} },
opts: { attempts: 1 },
})
expect(loggerMock.info).toHaveBeenCalledTimes(1)
expect(loggerMock.info).toHaveBeenCalledWith(
"Processing eventName which has 1 subscribers"
)
expect(result).toEqual(["hi"])
})
it("Processes event with failing subscribers", async () => {
eventBus.subscribe("eventName", () => Promise.resolve("hi"))
eventBus.subscribe("eventName", () => Promise.reject("fail1"))
eventBus.subscribe("eventName", () => Promise.resolve("hi2"))
eventBus.subscribe("eventName", () => Promise.reject("fail2"))
result = await eventBus.worker_({
data: { eventName: "eventName", data: {} },
update: (data) => data,
opts: { attempts: 1 },
})
expect(loggerMock.info).toHaveBeenCalledTimes(1)
expect(loggerMock.info).toHaveBeenCalledWith(
"Processing eventName which has 4 subscribers"
)
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail2"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events."
)
expect(result).toEqual(["hi", "fail1", "hi2", "fail2"])
})
it("Retries processing when subcribers fail, if configured - final attempt", async () => {
eventBus.subscribe("eventName", async () => Promise.resolve("hi"), {
subscriberId: "1",
})
eventBus.subscribe("eventName", async () => Promise.reject("fail1"), {
subscriberId: "2",
})
result = await eventBus
.worker_({
data: {
eventName: "eventName",
data: {},
completedSubscriberIds: ["1"],
},
attemptsMade: 2,
update: (data) => data,
opts: { attempts: 2 },
})
.catch((error) => void 0)
expect(loggerMock.warn).toHaveBeenCalledTimes(1)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
)
expect(loggerMock.info).toHaveBeenCalledTimes(2)
expect(loggerMock.info).toHaveBeenCalledWith(
"Final retry attempt for eventName"
)
expect(loggerMock.info).toHaveBeenCalledWith(
"Retrying eventName which has 2 subscribers (1 of them failed)"
)
})
it("Retries processing when subcribers fail, if configured", async () => {
eventBus.subscribe("eventName", async () => Promise.resolve("hi"), {
subscriberId: "1",
})
eventBus.subscribe("eventName", async () => Promise.reject("fail1"), {
subscriberId: "2",
})
result = await eventBus
.worker_({
data: {
eventName: "eventName",
data: {},
completedSubscriberIds: ["1"],
},
attemptsMade: 2,
updateData: (data) => data,
opts: { attempts: 3 },
})
.catch((err) => void 0)
expect(loggerMock.warn).toHaveBeenCalledTimes(2)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"One or more subscribers of eventName failed. Retrying..."
)
expect(loggerMock.info).toHaveBeenCalledTimes(1)
expect(loggerMock.info).toHaveBeenCalledWith(
"Retrying eventName which has 2 subscribers (1 of them failed)"
)
})
})
})
})

View File

@@ -0,0 +1,491 @@
import { Logger } from "@medusajs/types"
import { Queue, Worker } from "bullmq"
import { Redis } from "ioredis"
import RedisEventBusService from "../event-bus-redis"
// const redisURL = "redis://localhost:6379"
// const client = new Redis(6379, redisURL, {
// // Lazy connect to properly handle connection errors
// lazyConnect: true,
// maxRetriesPerRequest: 0,
// })
jest.genMockFromModule("bullmq")
jest.genMockFromModule("ioredis")
jest.mock("bullmq")
jest.mock("ioredis")
const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
} as unknown as Logger
const redisMock = {
del: () => jest.fn(),
rpush: () => jest.fn(),
lrange: () => jest.fn(),
disconnect: () => jest.fn(),
expire: () => jest.fn(),
} as unknown as Redis
const simpleModuleOptions = { redisUrl: "test-url" }
const moduleDeps = {
logger: loggerMock,
eventBusRedisConnection: redisMock,
}
describe("RedisEventBusService", () => {
let eventBus: RedisEventBusService
let queue
let redis
describe("constructor", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
resources: "shared",
})
})
it("Creates a queue + worker", () => {
expect(Queue).toHaveBeenCalledTimes(1)
expect(Queue).toHaveBeenCalledWith("events-queue", {
connection: expect.any(Object),
prefix: "RedisEventBusService",
})
expect(Worker).toHaveBeenCalledTimes(1)
expect(Worker).toHaveBeenCalledWith(
"events-queue",
expect.any(Function),
{
connection: expect.any(Object),
prefix: "RedisEventBusService",
}
)
})
it("Throws on isolated module declaration", () => {
try {
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
resources: "isolated",
scope: "internal",
})
} catch (error) {
expect(error.message).toEqual(
"At the moment this module can only be used with shared resources"
)
}
})
})
describe("emit", () => {
describe("Successfully emits events", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
scope: "internal",
resources: "shared",
})
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
redis = (eventBus as any).eventBusRedisConnection_
redis.rpush = jest.fn()
})
it("should add job to queue with default options", async () => {
await eventBus.emit([
{
eventName: "eventName",
data: {
hi: "1234",
},
},
])
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 1,
removeOnComplete: true,
},
},
])
})
it("should add job to queue with custom options passed directly upon emitting", async () => {
await eventBus.emit(
[{ eventName: "eventName", data: { hi: "1234" } }],
{ attempts: 3, backoff: 5000, delay: 1000 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 3,
backoff: 5000,
delay: 1000,
removeOnComplete: true,
},
},
])
})
it("should add job to queue with module job options", async () => {
eventBus = new RedisEventBusService(
moduleDeps,
{
...simpleModuleOptions,
jobOptions: {
removeOnComplete: { age: 5 },
attempts: 7,
},
},
{
resources: "shared",
scope: "internal",
}
)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
await eventBus.emit(
[
{
eventName: "eventName",
data: { hi: "1234" },
},
],
{ attempts: 3, backoff: 5000, delay: 1000 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 3,
backoff: 5000,
delay: 1000,
removeOnComplete: {
age: 5,
},
},
},
])
})
it("should add job to queue with default, local, and global options merged", async () => {
eventBus = new RedisEventBusService(
moduleDeps,
{
...simpleModuleOptions,
jobOptions: {
removeOnComplete: 5,
},
},
{
resources: "shared",
scope: "internal",
}
)
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
await eventBus.emit(
{
eventName: "eventName",
data: { hi: "1234" },
},
{ delay: 1000 }
)
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
opts: {
attempts: 1,
removeOnComplete: 5,
delay: 1000,
},
},
])
})
it("should successfully group events", async () => {
const options = { delay: 1000 }
const event = {
eventName: "eventName",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-1" },
}
const [builtEvent] = (eventBus as any).buildEvents([event], options)
await eventBus.emit(event, options)
expect(queue.addBulk).toHaveBeenCalledTimes(0)
expect(redis.rpush).toHaveBeenCalledTimes(1)
expect(redis.rpush).toHaveBeenCalledWith(
"staging:test-group-1",
JSON.stringify(builtEvent)
)
})
it("should successfully group, release and clear events", async () => {
const options = { delay: 1000 }
const events = [
{
eventName: "grouped-event-1",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-1" },
},
{
eventName: "ungrouped-event-2",
data: { hi: "1234" },
},
{
eventName: "grouped-event-2",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-2" },
},
{
eventName: "grouped-event-3",
data: { hi: "1235" },
metadata: { eventGroupId: "test-group-2" },
},
]
redis.del = jest.fn()
await eventBus.emit(events, options)
// Expect 1 event to have been send
// Expect 2 pushes to redis as there are 2 groups of events to push
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(redis.rpush).toHaveBeenCalledTimes(2)
expect(redis.del).not.toHaveBeenCalled()
const [testGroup1Event] = (eventBus as any).buildEvents(
[events[0]],
options
)
const [testGroup2Event] = (eventBus as any).buildEvents(
[events[2]],
options
)
const [testGroup2Event2] = (eventBus as any).buildEvents(
[events[3]],
options
)
redis.lrange = jest.fn((key) => {
if (key === "staging:test-group-1") {
return Promise.resolve([JSON.stringify(testGroup1Event)])
}
if (key === "staging:test-group-2") {
return Promise.resolve([
JSON.stringify(testGroup2Event),
JSON.stringify(testGroup2Event2),
])
}
})
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
await eventBus.releaseGroupedEvents("test-group-1")
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([testGroup1Event])
expect(redis.del).toHaveBeenCalledTimes(1)
expect(redis.del).toHaveBeenCalledWith("staging:test-group-1")
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
redis.del = jest.fn()
await eventBus.releaseGroupedEvents("test-group-2")
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
testGroup2Event,
testGroup2Event2,
])
expect(redis.del).toHaveBeenCalledTimes(1)
expect(redis.del).toHaveBeenCalledWith("staging:test-group-2")
})
})
})
describe("worker_", () => {
let result
describe("Successfully processes the jobs", () => {
beforeEach(async () => {
jest.clearAllMocks()
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
resources: "shared",
scope: "internal",
})
})
it("should process a simple event with no options", async () => {
const test: string[] = []
eventBus.subscribe("eventName", () => {
test.push("success")
return Promise.resolve()
})
// TODO: The typing for this is all over the place
await eventBus.worker_({
data: { eventName: "eventName", data: { test: 1 } },
opts: { attempts: 1 },
} as any)
expect(loggerMock.info).toHaveBeenCalledTimes(1)
expect(loggerMock.info).toHaveBeenCalledWith(
"Processing eventName which has 1 subscribers"
)
expect(test).toEqual(["success"])
})
it("should process event with failing subscribers", async () => {
const test: string[] = []
eventBus.subscribe("eventName", () => {
test.push("hi")
return Promise.resolve()
})
eventBus.subscribe("eventName", () => {
test.push("fail1")
return Promise.reject("fail1")
})
eventBus.subscribe("eventName", () => {
test.push("hi2")
return Promise.resolve()
})
eventBus.subscribe("eventName", () => {
test.push("fail2")
return Promise.reject("fail2")
})
result = await eventBus.worker_({
data: { eventName: "eventName", data: { test: 1 } },
opts: { attempts: 1 },
update: (data) => data,
} as any)
expect(loggerMock.info).toHaveBeenCalledTimes(1)
expect(loggerMock.info).toHaveBeenCalledWith(
"Processing eventName which has 4 subscribers"
)
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail2"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events."
)
expect(test.sort()).toEqual(["hi", "fail1", "hi2", "fail2"].sort())
})
it("should retry processing when subcribers fail, if configured - final attempt", async () => {
eventBus.subscribe("eventName", async () => Promise.resolve(), {
subscriberId: "1",
})
eventBus.subscribe("eventName", async () => Promise.reject("fail1"), {
subscriberId: "2",
})
result = await eventBus
.worker_({
data: {
eventName: "eventName",
data: {},
completedSubscriberIds: ["1"],
},
attemptsMade: 2,
update: (data) => data,
opts: { attempts: 2 },
} as any)
.catch((error) => void 0)
expect(loggerMock.warn).toHaveBeenCalledTimes(1)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
)
expect(loggerMock.info).toHaveBeenCalledTimes(2)
expect(loggerMock.info).toHaveBeenCalledWith(
"Final retry attempt for eventName"
)
expect(loggerMock.info).toHaveBeenCalledWith(
"Retrying eventName which has 2 subscribers (1 of them failed)"
)
})
it("should retry processing when subcribers fail, if configured", async () => {
eventBus.subscribe("eventName", async () => Promise.resolve(), {
subscriberId: "1",
})
eventBus.subscribe("eventName", async () => Promise.reject("fail1"), {
subscriberId: "2",
})
result = await eventBus
.worker_({
data: {
eventName: "eventName",
data: {},
completedSubscriberIds: ["1"],
},
attemptsMade: 2,
updateData: (data) => data,
opts: { attempts: 3 },
} as any)
.catch((err) => void 0)
expect(loggerMock.warn).toHaveBeenCalledTimes(2)
expect(loggerMock.warn).toHaveBeenCalledWith(
"An error occurred while processing eventName: fail1"
)
expect(loggerMock.warn).toHaveBeenCalledWith(
"One or more subscribers of eventName failed. Retrying..."
)
expect(loggerMock.info).toHaveBeenCalledTimes(1)
expect(loggerMock.info).toHaveBeenCalledWith(
"Retrying eventName which has 2 subscribers (1 of them failed)"
)
})
})
})
})

View File

@@ -1,15 +1,25 @@
import { InternalModuleDeclaration } from "@medusajs/modules-sdk"
import { EmitData, Logger, Message } from "@medusajs/types"
import { AbstractEventBusModuleService, isString } from "@medusajs/utils"
import { BulkJobOptions, JobsOptions, Queue, Worker } from "bullmq"
import { Logger, Message, MessageBody } from "@medusajs/types"
import {
AbstractEventBusModuleService,
isPresent,
promiseAll,
} from "@medusajs/utils"
import { BulkJobOptions, Queue, Worker } from "bullmq"
import { Redis } from "ioredis"
import { BullJob, EmitOptions, EventBusRedisModuleOptions } from "../types"
import { BullJob, EventBusRedisModuleOptions } from "../types"
type InjectedDependencies = {
logger: Logger
eventBusRedisConnection: Redis
}
type IORedisEventType<T = unknown> = {
name: string
data: MessageBody<T>
opts: BulkJobOptions
}
/**
* Can keep track of multiple subscribers to different events and run the
* subscribers when events happen. Events will run asynchronously.
@@ -71,78 +81,137 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
},
}
/**
* Emit a single event
* @param {string} eventName - the name of the event to be process.
* @param data - the data to send to the subscriber.
* @param options - options to add the job with
*/
async emit<T>(
eventName: string,
data: T,
options: Record<string, unknown>
): Promise<void>
/**
* Emit a number of events
* @param {EmitData} data - the data to send to the subscriber.
*/
async emit<T>(data: EmitData<T>[]): Promise<void>
async emit<T>(data: Message<T>[]): Promise<void>
async emit<T, TInput extends string | EmitData<T>[] | Message<T>[] = string>(
eventNameOrData: TInput,
data?: T,
options: BulkJobOptions | JobsOptions = {}
): Promise<void> {
const globalJobOptions = this.moduleOptions_.jobOptions ?? {}
const isBulkEmit = Array.isArray(eventNameOrData)
private buildEvents<T>(
eventsData: Message<T>[],
options: BulkJobOptions = {}
): IORedisEventType<T>[] {
const opts = {
// default options
removeOnComplete: true,
attempts: 1,
// global options
...globalJobOptions,
} as EmitOptions
...(this.moduleOptions_.jobOptions ?? {}),
...options,
}
const dataBody = isString(eventNameOrData)
? data ?? (data as Message<T>).body
: undefined
return eventsData.map((eventData) => {
const { options, ...eventBody } = eventData
const events = isBulkEmit
? eventNameOrData.map((event) => ({
name: event.eventName,
data: {
eventName: event.eventName,
data: (event as EmitData).data ?? (event as Message<T>).body,
},
opts: {
...opts,
// local options
...event.options,
},
}))
: [
{
name: eventNameOrData as string,
data: { eventName: eventNameOrData, data: dataBody },
opts: {
...opts,
// local options
...options,
},
},
]
await this.queue_.addBulk(events)
return {
name: eventData.eventName,
data: eventBody,
opts: {
// options for event group
...opts,
// options for a particular event
...options,
},
}
})
}
// TODO: Implement redis based staging + release
async releaseGroupedEvents(eventGroupId: string) {}
async clearGroupedEvents(eventGroupId: string) {}
/**
* Emit a single or number of events
* @param {Message} data - the data to send to the subscriber.
* @param {BulkJobOptions} data - the options to add to bull mq
*/
async emit<T = unknown>(
eventsData: Message<T> | Message<T>[],
options: BulkJobOptions & { groupedEventsTTL?: number } = {}
): Promise<void> {
let eventsDataArray = Array.isArray(eventsData) ? eventsData : [eventsData]
const { groupedEventsTTL = 600 } = options
delete options.groupedEventsTTL
const eventsToEmit = eventsDataArray.filter(
(eventData) => !isPresent(eventData.metadata?.eventGroupId)
)
const eventsToGroup = eventsDataArray.filter((eventData) =>
isPresent(eventData.metadata?.eventGroupId)
)
const groupEventsMap = new Map<string, Message<T>[]>()
for (const event of eventsToGroup) {
const groupId = event.metadata?.eventGroupId!
const array = groupEventsMap.get(groupId) ?? []
array.push(event)
groupEventsMap.set(groupId, array)
}
const promises: Promise<unknown>[] = []
if (eventsToEmit.length) {
const emitData = this.buildEvents(eventsToEmit, options)
promises.push(this.queue_.addBulk(emitData))
}
for (const [groupId, events] of groupEventsMap.entries()) {
if (!events?.length) {
continue
}
// Set a TTL for the key of the list that is scoped to a group
// This will be helpful in preventing stale data from staying in redis for too long
// in the event the module fails to cleanup events. For long running workflows, setting a much higher
// TTL or even skipping the TTL would be required
this.setExpire(groupId, groupedEventsTTL)
const eventsData = this.buildEvents(events, options)
promises.push(this.groupEvents(groupId, eventsData))
}
await promiseAll(promises)
}
private async setExpire(eventGroupId: string, ttl: number) {
if (!eventGroupId) {
return
}
await this.eventBusRedisConnection_.expire(`staging:${eventGroupId}`, ttl)
}
private async groupEvents<T = unknown>(
eventGroupId: string,
events: IORedisEventType<T>[]
) {
await this.eventBusRedisConnection_.rpush(
`staging:${eventGroupId}`,
...events.map((event) => JSON.stringify(event))
)
}
private async getGroupedEvents(
eventGroupId: string
): Promise<IORedisEventType[]> {
return await this.eventBusRedisConnection_
.lrange(`staging:${eventGroupId}`, 0, -1)
.then((result) => {
return result.map((jsonString) => JSON.parse(jsonString))
})
}
async releaseGroupedEvents(eventGroupId: string) {
const groupedEvents = await this.getGroupedEvents(eventGroupId)
await this.queue_.addBulk(groupedEvents)
await this.clearGroupedEvents(eventGroupId)
}
async clearGroupedEvents(eventGroupId: string) {
if (!eventGroupId) {
return
}
await this.eventBusRedisConnection_.del(`staging:${eventGroupId}`)
}
/**
* Handles incoming jobs.