chore: group & release events for local eventbus (#7649)

* chore: stage & release events for local eventbus

* chore: address review

* chore: mock emitter correctly
This commit is contained in:
Riqwan Thamir
2024-06-07 15:31:16 +02:00
committed by GitHub
parent c9c2b6c88f
commit fbb00f3863
5 changed files with 237 additions and 71 deletions

View File

@@ -24,6 +24,16 @@ export abstract class AbstractEventBusModuleService
abstract emit<T>(data: EventBusTypes.EmitData<T>[]): Promise<void>
abstract emit<T>(data: EventBusTypes.Message<T>[]): Promise<void>
/*
Grouped events are useful when you have distributed transactions
where you need to explicitly group, release and clear events upon
lifecycle events of a transaction.
*/
// Given a eventGroupId, all the grouped events will be released
abstract releaseGroupedEvents(eventGroupId: string): Promise<void>
// Given a eventGroupId, all the grouped events will be cleared
abstract clearGroupedEvents(eventGroupId: string): Promise<void>
protected storeSubscribers({
event,
subscriberId,

View File

@@ -1,70 +0,0 @@
import LocalEventBusService from "../event-bus-local"
jest.genMockFromModule("events")
jest.mock("events")
const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
}
const moduleDeps = {
logger: loggerMock,
}
describe("LocalEventBusService", () => {
let eventBus
describe("emit", () => {
describe("Successfully emits events", () => {
beforeEach(() => {
jest.clearAllMocks()
})
it("Emits an event", () => {
eventBus = new LocalEventBusService(
moduleDeps,
{},
{
resources: "shared",
}
)
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
eventBus.emit("eventName", { hi: "1234" })
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("eventName", {
hi: "1234",
})
})
it("Emits multiple events", () => {
eventBus = new LocalEventBusService(
moduleDeps,
{},
{
resources: "shared",
}
)
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
eventBus.emit([
{ eventName: "event-1", data: { hi: "1234" } },
{ eventName: "event-2", data: { hi: "5678" } },
])
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", {
hi: "1234",
})
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", {
hi: "5678",
})
})
})
})
})

View File

@@ -0,0 +1,170 @@
import LocalEventBusService from "../event-bus-local"
jest.genMockFromModule("events")
jest.mock("events")
const loggerMock = {
info: jest.fn().mockReturnValue(console.log),
warn: jest.fn().mockReturnValue(console.log),
error: jest.fn().mockReturnValue(console.log),
}
const moduleDeps = {
logger: loggerMock,
}
describe("LocalEventBusService", () => {
let eventBus
describe("emit", () => {
describe("Successfully emits events", () => {
beforeEach(() => {
jest.clearAllMocks()
eventBus = new LocalEventBusService(moduleDeps as any)
})
it("should emit an event", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
await eventBus.emit("eventName", { hi: "1234" })
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("eventName", {
hi: "1234",
})
})
it("should emit multiple events", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
await eventBus.emit([
{ eventName: "event-1", data: { hi: "1234" } },
{ eventName: "event-2", data: { hi: "5678" } },
])
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", {
hi: "1234",
})
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", {
hi: "5678",
})
})
it("should group an event if data consists of eventGroupId", async () => {
const groupEventFn = jest.spyOn(eventBus, "groupEvent")
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
await eventBus.emit("test-event", {
test: "1234",
eventGroupId: "test",
})
expect(eventBus.eventEmitter_.emit).not.toHaveBeenCalled()
expect(groupEventFn).toHaveBeenCalledTimes(1)
expect(groupEventFn).toHaveBeenCalledWith("test", "test-event", {
test: "1234",
})
jest.clearAllMocks()
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
eventBus.emit("test-event", { test: "1234", eventGroupId: "test" })
eventBus.emit("test-event", { test: "test-1" })
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1)
expect(groupEventFn).toHaveBeenCalledTimes(1)
expect(eventBus.groupedEventsMap_.get("test")).toEqual([
expect.objectContaining({ eventName: "test-event" }),
expect.objectContaining({ eventName: "test-event" }),
])
await eventBus.emit("test-event", {
test: "1234",
eventGroupId: "test-2",
})
expect(eventBus.groupedEventsMap_.get("test-2")).toEqual([
expect.objectContaining({ eventName: "test-event" }),
])
})
it("should release events when requested with eventGroupId", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
await eventBus.emit([
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-1" },
},
{
eventName: "event-2",
data: { test: "2", eventGroupId: "group-1" },
},
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-2" },
},
{
eventName: "event-2",
data: { test: "2", eventGroupId: "group-2" },
},
{ eventName: "event-1", data: { test: "1" } },
])
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(1)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", {
test: "1",
})
expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(2)
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2)
jest.clearAllMocks()
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
eventBus.releaseGroupedEvents("group-1")
expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined()
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(2)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledTimes(2)
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-1", {
test: "1",
})
expect(eventBus.eventEmitter_.emit).toHaveBeenCalledWith("event-2", {
test: "2",
})
})
it("should clear events from grouped events when requested with eventGroupId", async () => {
eventBus.eventEmitter_.emit.mockImplementationOnce((data) => data)
await eventBus.emit([
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-1" },
},
{
eventName: "event-1",
data: { test: "1", eventGroupId: "group-2" },
},
])
expect(eventBus.groupedEventsMap_.get("group-1")).toHaveLength(1)
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1)
eventBus.clearGroupedEvents("group-1")
expect(eventBus.groupedEventsMap_.get("group-1")).not.toBeDefined()
expect(eventBus.groupedEventsMap_.get("group-2")).toHaveLength(1)
eventBus.clearGroupedEvents("group-2")
expect(eventBus.groupedEventsMap_.get("group-2")).not.toBeDefined()
})
})
})
})

View File

@@ -14,6 +14,8 @@ type InjectedDependencies = {
logger: Logger
}
type StagingQueueType = Map<string, { eventName: string; data?: unknown }[]>
const eventEmitter = new EventEmitter()
eventEmitter.setMaxListeners(Infinity)
@@ -21,6 +23,7 @@ eventEmitter.setMaxListeners(Infinity)
export default class LocalEventBusService extends AbstractEventBusModuleService {
protected readonly logger_?: Logger
protected readonly eventEmitter_: EventEmitter
protected groupedEventsMap_: StagingQueueType
constructor({ logger }: MedusaContainer & InjectedDependencies) {
// @ts-ignore
@@ -29,6 +32,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
this.logger_ = logger
this.eventEmitter_ = eventEmitter
this.groupedEventsMap_ = new Map()
}
async emit<T>(
@@ -70,10 +74,58 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
}
const data = (event as EmitData).data ?? (event as Message<T>).body
this.eventEmitter_.emit(event.eventName, data)
await this.groupOrEmitEvent(event.eventName, data)
}
}
// If the data of the event consists of a eventGroupId, we don't emit the event, instead
// we add them to a queue grouped by the eventGroupId and release them when
// explicitly requested.
// This is useful in the event of a distributed transaction where you'd want to emit
// events only once the transaction ends.
private async groupOrEmitEvent(
eventName: string,
data: unknown & { eventGroupId?: string }
) {
const { eventGroupId, ...eventData } = data
if (eventGroupId) {
await this.groupEvent(eventGroupId, eventName, eventData)
} else {
this.eventEmitter_.emit(eventName, data)
}
}
// Groups an event to a queue to be emitted upon explicit release
private async groupEvent(
eventGroupId: string,
eventName: string,
data: unknown
) {
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
groupedEvents.push({ eventName, data })
this.groupedEventsMap_.set(eventGroupId, groupedEvents)
}
async releaseGroupedEvents(eventGroupId: string) {
const groupedEvents = this.groupedEventsMap_.get(eventGroupId) || []
for (const event of groupedEvents) {
const { eventName, data } = event
this.eventEmitter_.emit(eventName, data)
}
this.clearGroupedEvents(eventGroupId)
}
async clearGroupedEvents(eventGroupId: string) {
this.groupedEventsMap_.delete(eventGroupId)
}
subscribe(event: string | symbol, subscriber: Subscriber): this {
const randId = ulid()
this.storeSubscribers({ event, subscriberId: randId, subscriber })

View File

@@ -140,6 +140,10 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
await this.queue_.addBulk(events)
}
// TODO: Implement redis based staging + release
async releaseGroupedEvents(eventGroupId: string) {}
async clearGroupedEvents(eventGroupId: string) {}
/**
* Handles incoming jobs.
* @param job The job object