feat: restructure events payload (#8143)

* refactor: restructure events payload

Breaking change: This PR changes the event payload accepted by the event
listeners

* refactor: fix failing tests and implement feedback

* add integration tests

* fix timeout

---------

Co-authored-by: Adrien de Peretti <adrien.deperetti@gmail.com>
This commit is contained in:
Harminder Virk
2024-07-16 21:39:16 +05:30
committed by GitHub
parent 5813216c88
commit f579f0b3be
26 changed files with 194 additions and 111 deletions

View File

@@ -30,14 +30,14 @@ describe("LocalEventBusService", () => {
eventEmitter.emit = jest.fn((data) => data)
await eventBus.emit({
eventName: "eventName",
name: "eventName",
data: { hi: "1234" },
})
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("eventName", {
data: { hi: "1234" },
eventName: "eventName",
name: "eventName",
})
})
@@ -45,18 +45,18 @@ describe("LocalEventBusService", () => {
eventEmitter.emit = jest.fn((data) => data)
await eventBus.emit([
{ eventName: "event-1", data: { hi: "1234" } },
{ eventName: "event-2", data: { hi: "5678" } },
{ name: "event-1", data: { hi: "1234" } },
{ name: "event-2", data: { hi: "5678" } },
])
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { hi: "1234" },
eventName: "event-1",
name: "event-1",
})
expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", {
data: { hi: "5678" },
eventName: "event-2",
name: "event-2",
})
})
@@ -65,7 +65,7 @@ describe("LocalEventBusService", () => {
eventEmitter.emit = jest.fn((data) => data)
await eventBus.emit({
eventName: "test-event",
name: "test-event",
data: {
test: "1234",
},
@@ -79,7 +79,7 @@ describe("LocalEventBusService", () => {
expect(groupEventFn).toHaveBeenCalledWith("test", {
data: { test: "1234" },
metadata: { eventGroupId: "test" },
eventName: "test-event",
name: "test-event",
})
jest.clearAllMocks()
@@ -89,12 +89,12 @@ describe("LocalEventBusService", () => {
eventBus.emit([
{
eventName: "test-event",
name: "test-event",
data: { test: "1234" },
metadata: { eventGroupId: "test" },
},
{
eventName: "test-event",
name: "test-event",
data: { test: "test-1" },
},
])
@@ -102,18 +102,18 @@ describe("LocalEventBusService", () => {
expect(groupEventFn).toHaveBeenCalledTimes(1)
expect((eventBus as any).groupedEventsMap_.get("test")).toEqual([
expect.objectContaining({ eventName: "test-event" }),
expect.objectContaining({ eventName: "test-event" }),
expect.objectContaining({ name: "test-event" }),
expect.objectContaining({ name: "test-event" }),
])
await eventBus.emit({
eventName: "test-event",
name: "test-event",
data: { test: "1234" },
metadata: { eventGroupId: "test-2" },
})
expect((eventBus as any).groupedEventsMap_.get("test-2")).toEqual([
expect.objectContaining({ eventName: "test-event" }),
expect.objectContaining({ name: "test-event" }),
])
})
@@ -122,32 +122,32 @@ describe("LocalEventBusService", () => {
await eventBus.emit([
{
eventName: "event-1",
name: "event-1",
data: { test: "1" },
metadata: { eventGroupId: "group-1" },
},
{
eventName: "event-2",
name: "event-2",
data: { test: "2" },
metadata: { eventGroupId: "group-1" },
},
{
eventName: "event-1",
name: "event-1",
data: { test: "1" },
metadata: { eventGroupId: "group-2" },
},
{
eventName: "event-2",
name: "event-2",
data: { test: "2" },
metadata: { eventGroupId: "group-2" },
},
{ eventName: "event-1", data: { test: "1" } },
{ name: "event-1", data: { test: "1" } },
])
expect(eventEmitter.emit).toHaveBeenCalledTimes(1)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { test: "1" },
eventName: "event-1",
name: "event-1",
})
expect((eventBus as any).groupedEventsMap_.get("group-1")).toHaveLength(
@@ -171,12 +171,12 @@ describe("LocalEventBusService", () => {
expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
expect(eventEmitter.emit).toHaveBeenCalledWith("event-1", {
data: { test: "1" },
eventName: "event-1",
name: "event-1",
metadata: { eventGroupId: "group-1" },
})
expect(eventEmitter.emit).toHaveBeenCalledWith("event-2", {
data: { test: "2" },
eventName: "event-2",
name: "event-2",
metadata: { eventGroupId: "group-1" },
})
})
@@ -187,12 +187,12 @@ describe("LocalEventBusService", () => {
await eventBus.emit([
{
eventName: "event-1",
name: "event-1",
data: { test: "1" },
metadata: { eventGroupId: "group-1" },
},
{
eventName: "event-1",
name: "event-1",
data: { test: "1" },
metadata: { eventGroupId: "group-2" },
},

View File

@@ -3,7 +3,7 @@ import {
EventBusTypes,
Logger,
Message,
MessageBody,
Event,
Subscriber,
} from "@medusajs/types"
import { AbstractEventBusModuleService } from "@medusajs/utils"
@@ -45,11 +45,11 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
for (const eventData of normalizedEventsData) {
const eventListenersCount = this.eventEmitter_.listenerCount(
eventData.eventName
eventData.name
)
this.logger_?.info(
`Processing ${eventData.eventName} which has ${eventListenersCount} subscribers`
`Processing ${eventData.name} which has ${eventListenersCount} subscribers`
)
if (eventListenersCount === 0) {
@@ -73,7 +73,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
await this.groupEvent(eventGroupId, eventData)
} else {
const { options, ...eventBody } = eventData
this.eventEmitter_.emit(eventData.eventName, eventBody)
this.eventEmitter_.emit(eventData.name, eventBody)
}
}
@@ -95,7 +95,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
for (const event of groupedEvents) {
const { options, ...eventBody } = event
this.eventEmitter_.emit(event.eventName, eventBody)
this.eventEmitter_.emit(event.name, eventBody)
}
this.clearGroupedEvents(eventGroupId)
@@ -108,7 +108,7 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
subscribe(event: string | symbol, subscriber: Subscriber): this {
const randId = ulid()
this.storeSubscribers({ event, subscriberId: randId, subscriber })
this.eventEmitter_.on(event, async (data: MessageBody) => {
this.eventEmitter_.on(event, async (data: Event) => {
try {
await subscriber(data)
} catch (e) {

View File

@@ -101,7 +101,7 @@ describe("RedisEventBusService", () => {
it("should add job to queue with default options", async () => {
await eventBus.emit([
{
eventName: "eventName",
name: "eventName",
data: {
hi: "1234",
},
@@ -112,7 +112,7 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
data: { hi: "1234" },
opts: {
attempts: 1,
removeOnComplete: true,
@@ -122,16 +122,17 @@ describe("RedisEventBusService", () => {
})
it("should add job to queue with custom options passed directly upon emitting", async () => {
await eventBus.emit(
[{ eventName: "eventName", data: { hi: "1234" } }],
{ attempts: 3, backoff: 5000, delay: 1000 }
)
await eventBus.emit([{ name: "eventName", data: { hi: "1234" } }], {
attempts: 3,
backoff: 5000,
delay: 1000,
})
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
data: { hi: "1234" },
opts: {
attempts: 3,
backoff: 5000,
@@ -164,7 +165,7 @@ describe("RedisEventBusService", () => {
await eventBus.emit(
[
{
eventName: "eventName",
name: "eventName",
data: { hi: "1234" },
},
],
@@ -175,7 +176,7 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
data: { hi: "1234" },
opts: {
attempts: 3,
backoff: 5000,
@@ -208,7 +209,7 @@ describe("RedisEventBusService", () => {
await eventBus.emit(
{
eventName: "eventName",
name: "eventName",
data: { hi: "1234" },
},
{ delay: 1000 }
@@ -218,7 +219,7 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledWith([
{
name: "eventName",
data: { eventName: "eventName", data: { hi: "1234" } },
data: { hi: "1234" },
opts: {
attempts: 1,
removeOnComplete: 5,
@@ -231,7 +232,7 @@ describe("RedisEventBusService", () => {
it("should successfully group events", async () => {
const options = { delay: 1000 }
const event = {
eventName: "eventName",
name: "eventName",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-1" },
}
@@ -252,21 +253,21 @@ describe("RedisEventBusService", () => {
const options = { delay: 1000 }
const events = [
{
eventName: "grouped-event-1",
name: "grouped-event-1",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-1" },
},
{
eventName: "ungrouped-event-2",
name: "ungrouped-event-2",
data: { hi: "1234" },
},
{
eventName: "grouped-event-2",
name: "grouped-event-2",
data: { hi: "1234" },
metadata: { eventGroupId: "test-group-2" },
},
{
eventName: "grouped-event-3",
name: "grouped-event-3",
data: { hi: "1235" },
metadata: { eventGroupId: "test-group-2" },
},

View File

@@ -1,5 +1,5 @@
import { InternalModuleDeclaration } from "@medusajs/modules-sdk"
import { Logger, Message, MessageBody } from "@medusajs/types"
import { Logger, Message, Event } from "@medusajs/types"
import {
AbstractEventBusModuleService,
isPresent,
@@ -14,9 +14,7 @@ type InjectedDependencies = {
eventBusRedisConnection: Redis
}
type IORedisEventType<T = unknown> = {
name: string
data: MessageBody<T>
type IORedisEventType<T = unknown> = Event<T> & {
opts: BulkJobOptions
}
@@ -98,8 +96,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
const { options, ...eventBody } = eventData
return {
name: eventData.eventName,
data: eventBody,
...eventBody,
opts: {
// options for event group
...opts,

View File

@@ -9,7 +9,7 @@ export function buildExpectedEventMessageShape(options: {
options?: Record<string, unknown>
}): EventBusTypes.Message {
return {
eventName: options.eventName,
name: options.eventName,
metadata: {
action: options.action,
eventGroupId: options.eventGroupId,

View File

@@ -207,7 +207,7 @@ export default class LinkModuleService<TLink> implements ILinkModule {
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
(data as { id: unknown }[]).map(({ id }) => ({
eventName: this.entityName_ + "." + CommonEvents.ATTACHED,
name: this.entityName_ + "." + CommonEvents.ATTACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.ATTACHED,
@@ -258,7 +258,7 @@ export default class LinkModuleService<TLink> implements ILinkModule {
const allData = Array.isArray(data) ? data : [data]
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
allData.map(({ id }) => ({
eventName: this.entityName_ + "." + CommonEvents.DETACHED,
name: this.entityName_ + "." + CommonEvents.DETACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.DETACHED,
@@ -307,7 +307,7 @@ export default class LinkModuleService<TLink> implements ILinkModule {
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
(deletedEntities as { id: string }[]).map(({ id }) => ({
eventName: this.entityName_ + "." + CommonEvents.DETACHED,
name: this.entityName_ + "." + CommonEvents.DETACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.DETACHED,
@@ -365,7 +365,7 @@ export default class LinkModuleService<TLink> implements ILinkModule {
await this.eventBusModuleService_?.emit<Record<string, unknown>>(
(restoredEntities as { id: string }[]).map(({ id }) => ({
eventName: this.entityName_ + "." + CommonEvents.ATTACHED,
name: this.entityName_ + "." + CommonEvents.ATTACHED,
metadata: {
source: this.serviceName_,
action: CommonEvents.ATTACHED,

View File

@@ -681,7 +681,7 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
expect.objectContaining({
data: { id: productCategoryOne.id },
eventName: "product-category.deleted",
name: "product-category.deleted",
metadata: {
action: "",
object: "",

View File

@@ -280,7 +280,7 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect(eventBusSpy).toHaveBeenCalledTimes(1)
expect(eventBusSpy).toHaveBeenCalledWith([
{
eventName: "product-collection.deleted",
name: "product-collection.deleted",
data: { id: collectionId },
metadata: {
action: "",
@@ -309,7 +309,7 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
{
data: { id: collectionId },
eventName: "product-collection.updated",
name: "product-collection.updated",
},
])
})
@@ -505,7 +505,7 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
{
data: { id: collections[0].id },
eventName: "product-collection.created",
name: "product-collection.created",
},
])
})

View File

@@ -283,7 +283,7 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect(eventBusSpy).toHaveBeenCalledTimes(1)
expect(eventBusSpy).toHaveBeenCalledWith([
{
eventName: "product.updated",
name: "product.updated",
data: { id: productOne.id },
},
])
@@ -631,7 +631,7 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect(eventBusSpy).toHaveBeenCalledTimes(1)
expect(eventBusSpy).toHaveBeenCalledWith([
{
eventName: "product.created",
name: "product.created",
data: { id: products[0].id },
},
])
@@ -721,7 +721,7 @@ moduleIntegrationTestRunner<IProductModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
{
eventName: "product.created",
name: "product.created",
data: { id: products[0].id },
},
])

View File

@@ -873,7 +873,7 @@ export default class ProductModuleService
await this.eventBusModuleService_?.emit<ProductCollectionEventData>(
collections.map(({ id }) => ({
eventName: ProductCollectionEvents.COLLECTION_CREATED,
name: ProductCollectionEvents.COLLECTION_CREATED,
data: { id },
}))
)
@@ -946,7 +946,7 @@ export default class ProductModuleService
if (created.length) {
await this.eventBusModuleService_?.emit<ProductCollectionEventData>(
created.map(({ id }) => ({
eventName: ProductCollectionEvents.COLLECTION_CREATED,
name: ProductCollectionEvents.COLLECTION_CREATED,
data: { id },
}))
)
@@ -955,7 +955,7 @@ export default class ProductModuleService
if (updated.length) {
await this.eventBusModuleService_?.emit<ProductCollectionEventData>(
updated.map(({ id }) => ({
eventName: ProductCollectionEvents.COLLECTION_UPDATED,
name: ProductCollectionEvents.COLLECTION_UPDATED,
data: { id },
}))
)
@@ -1016,7 +1016,7 @@ export default class ProductModuleService
await this.eventBusModuleService_?.emit<ProductCollectionEventData>(
updatedCollections.map(({ id }) => ({
eventName: ProductCollectionEvents.COLLECTION_UPDATED,
name: ProductCollectionEvents.COLLECTION_UPDATED,
data: { id },
}))
)
@@ -1279,7 +1279,7 @@ export default class ProductModuleService
await this.eventBusModuleService_?.emit<ProductEventData>(
createdProducts.map(({ id }) => ({
eventName: ProductEvents.PRODUCT_CREATED,
name: ProductEvents.PRODUCT_CREATED,
data: { id },
}))
)
@@ -1327,7 +1327,7 @@ export default class ProductModuleService
if (created.length) {
await this.eventBusModuleService_?.emit<ProductEventData>(
created.map(({ id }) => ({
eventName: ProductEvents.PRODUCT_CREATED,
name: ProductEvents.PRODUCT_CREATED,
data: { id },
}))
)
@@ -1336,7 +1336,7 @@ export default class ProductModuleService
if (updated.length) {
await this.eventBusModuleService_?.emit<ProductEventData>(
updated.map(({ id }) => ({
eventName: ProductEvents.PRODUCT_UPDATED,
name: ProductEvents.PRODUCT_UPDATED,
data: { id },
}))
)
@@ -1390,7 +1390,7 @@ export default class ProductModuleService
await this.eventBusModuleService_?.emit<ProductEventData>(
updatedProducts.map(({ id }) => ({
eventName: ProductEvents.PRODUCT_UPDATED,
name: ProductEvents.PRODUCT_UPDATED,
data: { id },
}))
)

View File

@@ -175,7 +175,7 @@ moduleIntegrationTestRunner<IUserModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
expect.objectContaining({
data: { id: "1" },
eventName: UserEvents.INVITE_UPDATED,
name: UserEvents.INVITE_UPDATED,
}),
])
})
@@ -192,7 +192,7 @@ moduleIntegrationTestRunner<IUserModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
expect.objectContaining({
data: { id: "1" },
eventName: UserEvents.INVITE_TOKEN_GENERATED,
name: UserEvents.INVITE_TOKEN_GENERATED,
}),
])
})
@@ -221,19 +221,19 @@ moduleIntegrationTestRunner<IUserModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
expect.objectContaining({
data: { id: "1" },
eventName: UserEvents.INVITE_CREATED,
name: UserEvents.INVITE_CREATED,
}),
expect.objectContaining({
data: { id: "2" },
eventName: UserEvents.INVITE_CREATED,
name: UserEvents.INVITE_CREATED,
}),
expect.objectContaining({
data: { id: "1" },
eventName: UserEvents.INVITE_TOKEN_GENERATED,
name: UserEvents.INVITE_TOKEN_GENERATED,
}),
expect.objectContaining({
data: { id: "2" },
eventName: UserEvents.INVITE_TOKEN_GENERATED,
name: UserEvents.INVITE_TOKEN_GENERATED,
}),
])
})

View File

@@ -220,7 +220,7 @@ moduleIntegrationTestRunner<IUserModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
expect.objectContaining({
data: { id: "1" },
eventName: UserEvents.USER_UPDATED,
name: UserEvents.USER_UPDATED,
}),
])
})
@@ -250,11 +250,11 @@ moduleIntegrationTestRunner<IUserModuleService>({
expect(eventBusSpy).toHaveBeenCalledWith([
expect.objectContaining({
data: { id: "1" },
eventName: UserEvents.USER_CREATED,
name: UserEvents.USER_CREATED,
}),
expect.objectContaining({
data: { id: "2" },
eventName: UserEvents.USER_CREATED,
name: UserEvents.USER_CREATED,
}),
])
})