feat(medusa): Bulk emit events (#3407)

This commit is contained in:
Adrien de Peretti
2023-03-13 15:28:51 +01:00
committed by GitHub
parent 601d20e7ab
commit f0a1355feb
14 changed files with 404 additions and 166 deletions

View File

@@ -0,0 +1,7 @@
---
"@medusajs/inventory": patch
"medusa-test-utils": patch
"@medusajs/medusa": patch
---
feat(medusa): Bulk emit events

View File

@@ -3,7 +3,7 @@ const path = require("path")
const { bootstrapApp } = require("../../../helpers/bootstrap-app")
const { initDb, useDb } = require("../../../helpers/use-db")
jest.setTimeout(30000)
jest.setTimeout(50000)
describe("Inventory Module", () => {
let appContainer

View File

@@ -1,17 +1,16 @@
import { EntityManager } from "typeorm"
import { isDefined, MedusaError } from "medusa-core-utils"
import {
FindConfig,
buildQuery,
IEventBusService,
FilterableReservationItemProps,
CreateReservationItemInput,
FilterableReservationItemProps,
FindConfig,
IEventBusService,
TransactionBaseService,
UpdateReservationItemInput,
} from "@medusajs/medusa"
import { ReservationItem } from "../models"
import { CONNECTION_NAME } from "../config"
import { InventoryLevelService } from "."
type InjectedDependencies = {
@@ -278,10 +277,12 @@ export default class ReservationItemService extends TransactionBaseService {
item.quantity * -1
),
])
})
await this.eventBusService_.emit(ReservationItemService.Events.DELETED, {
id: reservationItemId,
await this.eventBusService_
.withTransaction(manager)
.emit(ReservationItemService.Events.DELETED, {
id: reservationItemId,
})
})
}
}

View File

@@ -12,6 +12,8 @@ class MockRepo {
save,
findAndCount,
del,
count,
insertBulk
}) {
this.create_ = create;
this.update_ = update;
@@ -25,12 +27,19 @@ class MockRepo {
this.save_ = save;
this.findAndCount_ = findAndCount;
this.findOneWithRelations_ = findOneWithRelations;
this.insertBulk_ = insertBulk;
}
setFindOne(fn) {
this.findOne_ = fn;
}
insertBulk = jest.fn().mockImplementation((...args) => {
if (this.insertBulk_) {
return this.insertBulk_(...args)
}
return {}
})
create = jest.fn().mockImplementation((...args) => {
if (this.create_) {
return this.create_(...args);

View File

@@ -1,5 +1,28 @@
import { EntityRepository, Repository } from "typeorm"
import { StagedJob } from "../models/staged-job"
import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity"
import { StagedJob } from "../models"
import { rowSqlResultsToEntityTransformer } from "../utils"
@EntityRepository(StagedJob)
export class StagedJobRepository extends Repository<StagedJob> {}
export class StagedJobRepository extends Repository<StagedJob> {
async insertBulk(jobToCreates: QueryDeepPartialEntity<StagedJob>[]) {
const queryBuilder = this.createQueryBuilder()
.insert()
.into(StagedJob)
.values(jobToCreates)
// TODO: remove if statement once this issue is resolved https://github.com/typeorm/typeorm/issues/9850
if (!queryBuilder.connection.driver.isReturningSqlSupported("insert")) {
const rawStagedJobs = await queryBuilder.execute()
return rawStagedJobs.generatedMaps
}
const rawStagedJobs = await queryBuilder.returning("*").execute()
return rowSqlResultsToEntityTransformer(
rawStagedJobs.raw,
queryBuilder,
this.queryRunner!
)
}
}

View File

@@ -114,79 +114,174 @@ describe("EventBusService", () => {
})
describe("emit", () => {
let eventBus
let job
const eventName = "eventName"
const defaultOptions = {
attempts: 1,
removeOnComplete: true,
}
const data = { hi: "1234" }
const bulkData = [{ hi: "1234" }, { hi: "12345" }]
const mockManager = MockManager
describe("successfully adds job to queue", () => {
beforeAll(() => {
jest.resetAllMocks()
const stagedJobRepository = MockRepository({
find: () => Promise.resolve([]),
let eventBus
let stagedJobRepository
beforeEach(() => {
stagedJobRepository = MockRepository({
insertBulk: async (data) => data,
create: (data) => data,
})
eventBus = new EventBusService({
logger: loggerMock,
manager: MockManager,
manager: mockManager,
stagedJobRepository,
})
eventBus.queue_.add.mockImplementationOnce(() => "hi")
job = eventBus.emit("eventName", { hi: "1234" })
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
})
afterAll(async () => {
afterEach(async () => {
await eventBus.stopEnqueuer()
jest.clearAllMocks()
})
it("calls queue.add", () => {
expect(eventBus.queue_.add).toHaveBeenCalled()
it("calls queue.addBulk", async () => {
await eventBus.emit(eventName, data)
expect(eventBus.queue_.addBulk).toHaveBeenCalled()
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
data: {
data,
eventName,
},
opts: defaultOptions,
},
])
})
it("calls stagedJob repository insertBulk", async () => {
await eventBus.withTransaction(mockManager).emit(eventName, data)
expect(stagedJobRepository.create).toHaveBeenCalled()
expect(stagedJobRepository.create).toHaveBeenCalledWith({
event_name: eventName,
data: data,
options: defaultOptions,
})
expect(stagedJobRepository.insertBulk).toHaveBeenCalled()
expect(stagedJobRepository.insertBulk).toHaveBeenCalledWith([
{
event_name: eventName,
data,
options: defaultOptions,
},
])
})
})
describe("successfully adds job to queue with local options", () => {
beforeAll(() => {
jest.resetAllMocks()
const stagedJobRepository = MockRepository({
find: () => Promise.resolve([]),
describe("successfully adds jobs in bulk to queue", () => {
let eventBus
let stagedJobRepository
beforeEach(() => {
stagedJobRepository = MockRepository({
insertBulk: async (data) => data,
create: (data) => data,
})
eventBus = new EventBusService({
logger: loggerMock,
manager: MockManager,
manager: mockManager,
stagedJobRepository,
})
eventBus.queue_.add.mockImplementationOnce(() => "hi")
job = eventBus.emit(
"eventName",
{ hi: "1234" },
{ removeOnComplete: 100 }
)
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
})
afterAll(async () => {
afterEach(async () => {
jest.clearAllMocks()
await eventBus.stopEnqueuer()
})
it("calls queue.add", () => {
expect(eventBus.queue_.add).toHaveBeenCalled()
expect(eventBus.queue_.add).toHaveBeenCalledWith(
{ eventName: "eventName", data: { hi: "1234" } },
{ removeOnComplete: 100 }
)
it("calls queue.addBulk", async () => {
await eventBus.emit([
{ eventName, data: bulkData[0] },
{ eventName, data: bulkData[1] },
])
expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1)
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
data: {
data: bulkData[0],
eventName,
},
opts: defaultOptions,
},
{
data: {
data: bulkData[1],
eventName,
},
opts: defaultOptions,
},
])
})
it("calls stagedJob repository insertBulk", async () => {
await eventBus.withTransaction(mockManager).emit([
{ eventName, data: bulkData[0] },
{ eventName, data: bulkData[1] },
])
expect(stagedJobRepository.create).toHaveBeenCalledTimes(2)
expect(stagedJobRepository.create).toHaveBeenNthCalledWith(1, {
data: bulkData[0],
event_name: eventName,
options: defaultOptions,
})
expect(stagedJobRepository.create).toHaveBeenNthCalledWith(2, {
data: bulkData[1],
event_name: eventName,
options: defaultOptions,
})
expect(stagedJobRepository.insertBulk).toHaveBeenCalledTimes(1)
expect(stagedJobRepository.insertBulk).toHaveBeenCalledWith([
{
data: bulkData[0],
event_name: eventName,
options: defaultOptions,
},
{
data: bulkData[1],
event_name: eventName,
options: defaultOptions,
},
])
})
})
describe("successfully adds job to queue with global options", () => {
beforeAll(() => {
jest.resetAllMocks()
const stagedJobRepository = MockRepository({
find: () => Promise.resolve([]),
let eventBus
let stagedJobRepository
beforeEach(() => {
stagedJobRepository = MockRepository({
insertBulk: async (data) => data,
create: (data) => data,
})
eventBus = new EventBusService(
{
logger: loggerMock,
manager: MockManager,
manager: mockManager,
stagedJobRepository,
},
{
@@ -194,58 +289,78 @@ describe("EventBusService", () => {
}
)
eventBus.queue_.add.mockImplementationOnce(() => "hi")
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
job = eventBus.emit("eventName", { hi: "1234" })
eventBus.emit(eventName, data)
})
afterAll(async () => {
afterEach(async () => {
jest.clearAllMocks()
await eventBus.stopEnqueuer()
})
it("calls queue.add", () => {
expect(eventBus.queue_.add).toHaveBeenCalled()
expect(eventBus.queue_.add).toHaveBeenCalledWith(
{ eventName: "eventName", data: { hi: "1234" } },
{ removeOnComplete: 10, attempts: 1 }
)
it("calls queue.addBulk", () => {
expect(eventBus.queue_.addBulk).toHaveBeenCalled()
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
data: {
data,
eventName,
},
opts: { removeOnComplete: 10, attempts: 1 },
},
])
})
})
describe("successfully adds job to queue with default options", () => {
beforeAll(() => {
jest.resetAllMocks()
const stagedJobRepository = MockRepository({
find: () => Promise.resolve([]),
let eventBus
let stagedJobRepository
beforeEach(() => {
stagedJobRepository = MockRepository({
insertBulk: async (data) => data,
create: (data) => data,
})
eventBus = new EventBusService({
logger: loggerMock,
manager: MockManager,
manager: mockManager,
stagedJobRepository,
})
eventBus.queue_.add.mockImplementationOnce(() => "hi")
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
job = eventBus.emit("eventName", { hi: "1234" })
eventBus.emit(eventName, data)
})
afterAll(async () => {
afterEach(async () => {
jest.clearAllMocks()
await eventBus.stopEnqueuer()
})
it("calls queue.add", () => {
expect(eventBus.queue_.add).toHaveBeenCalled()
expect(eventBus.queue_.add).toHaveBeenCalledWith(
{ eventName: "eventName", data: { hi: "1234" } },
{ removeOnComplete: true, attempts: 1 }
)
it("calls queue.addBulk", () => {
expect(eventBus.queue_.addBulk).toHaveBeenCalled()
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
data: {
data,
eventName,
},
opts: { removeOnComplete: true, attempts: 1 },
},
])
})
})
describe("successfully adds job to queue with local options and global options merged", () => {
beforeAll(() => {
jest.resetAllMocks()
const stagedJobRepository = MockRepository({
find: () => Promise.resolve([]),
let eventBus
let stagedJobRepository
beforeEach(() => {
stagedJobRepository = MockRepository({
insertBulk: async (data) => data,
create: (data) => data,
})
eventBus = new EventBusService(
@@ -259,29 +374,36 @@ describe("EventBusService", () => {
}
)
eventBus.queue_.add.mockImplementationOnce(() => "hi")
eventBus.queue_.addBulk.mockImplementationOnce(() => "hi")
job = eventBus.emit(
"eventName",
{ hi: "1234" },
{ attempts: 10, delay: 1000, backoff: { type: "exponential" } }
)
eventBus.emit(eventName, data, {
attempts: 10,
delay: 1000,
backoff: { type: "exponential" },
})
})
afterAll(async () => {
afterEach(async () => {
jest.clearAllMocks()
await eventBus.stopEnqueuer()
})
it("calls queue.add", () => {
expect(eventBus.queue_.add).toHaveBeenCalled()
expect(eventBus.queue_.add).toHaveBeenCalledWith(
{ eventName: "eventName", data: { hi: "1234" } },
expect(eventBus.queue_.addBulk).toHaveBeenCalled()
expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([
{
removeOnComplete: 10, // global option
attempts: 10, // local option
delay: 1000, // local option
backoff: { type: "exponential" }, // local option
}
)
data: {
data,
eventName,
},
opts: {
removeOnComplete: 10, // global option
attempts: 10, // local option
delay: 1000, // local option
backoff: { type: "exponential" }, // local option
},
},
])
})
})
})

View File

@@ -1,8 +1,8 @@
import { IdMap, MockManager, MockRepository } from "medusa-test-utils"
import OrderService from "../order"
import { ProductVariantInventoryServiceMock } from "../__mocks__/product-variant-inventory"
import { LineItemServiceMock } from "../__mocks__/line-item"
import { newTotalsServiceMock } from "../__mocks__/new-totals"
import { ProductVariantInventoryServiceMock } from "../__mocks__/product-variant-inventory"
import { taxProviderServiceMock } from "../__mocks__/tax-provider"
describe("OrderService", () => {
@@ -1072,10 +1072,15 @@ describe("OrderService", () => {
{ no_notification: input }
)
expect(eventBusService.emit).toHaveBeenCalledWith(expect.any(String), {
id: expect.any(String),
no_notification: expected,
})
expect(eventBusService.emit).toHaveBeenCalledWith([
{
eventName: expect.any(String),
data: {
id: expect.any(String),
no_notification: expected,
},
},
])
}
)
})
@@ -1246,17 +1251,10 @@ describe("OrderService", () => {
save: jest.fn().mockImplementation((f) => f),
})
const eventBus = {
emit: () =>
Promise.resolve({
finished: () => Promise.resolve({}),
}),
}
const orderService = new OrderService({
manager: MockManager,
orderRepository: orderRepo,
eventBusService: eventBus,
eventBusService: eventBusService,
})
beforeEach(async () => {

View File

@@ -638,15 +638,17 @@ export default class ClaimService extends TransactionBaseService {
)
const claimOrder = await claimRepo.save(claim)
const eventBusTx = this.eventBus_.withTransaction(transactionManager)
for (const fulfillment of fulfillments) {
await eventBusTx.emit(ClaimService.Events.FULFILLMENT_CREATED, {
const eventsToEmit = fulfillments.map((fulfillment) => ({
eventName: ClaimService.Events.FULFILLMENT_CREATED,
data: {
id: id,
fulfillment_id: fulfillment.id,
no_notification: claim.no_notification,
})
}
},
}))
await this.eventBus_
.withTransaction(transactionManager)
.emit(eventsToEmit)
return claimOrder
}

View File

@@ -122,9 +122,11 @@ export default class CurrencyService extends TransactionBaseService {
)
await currencyRepo.save(currency)
await this.eventBusService_.emit(CurrencyService.Events.UPDATED, {
code,
})
await this.eventBusService_
.withTransaction(transactionManager)
.emit(CurrencyService.Events.UPDATED, {
code,
})
return currency
})

View File

@@ -1,11 +1,12 @@
import Bull, { JobOptions } from "bull"
import Redis from "ioredis"
import { isDefined } from "medusa-core-utils"
import { EntityManager } from "typeorm"
import { DeepPartial, EntityManager, In } from "typeorm"
import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity"
import { ulid } from "ulid"
import { StagedJob } from "../models"
import { StagedJobRepository } from "../repositories/staged-job"
import { ConfigModule, Logger } from "../types/global"
import { isString } from "../utils"
import { sleep } from "../utils/sleep"
import JobSchedulerService, { CreateJobOptions } from "./job-scheduler"
@@ -49,6 +50,12 @@ export type EmitOptions = {
}
} & JobOptions
export type EmitData<T = unknown> = {
eventName: string
data: T
opts?: Record<string, unknown> & EmitOptions
}
/**
* Can keep track of multiple subscribers to different events and run the
* subscribers when events happen. Events will run asynchronously.
@@ -213,6 +220,13 @@ export default class EventBusService {
return this
}
/**
* Calls all subscribers when an event occurs.
* @param data - The data to use to process the events
* @return the jobs from our queue
*/
async emit<T>(data: EmitData<T>[]): Promise<StagedJob[] | void>
/**
* Calls all subscribers when an event occurs.
* @param {string} eventName - the name of the event to be process.
@@ -223,29 +237,49 @@ export default class EventBusService {
async emit<T>(
eventName: string,
data: T,
options: Record<string, unknown> & EmitOptions = { attempts: 1 }
): Promise<StagedJob | void> {
options?: Record<string, unknown> & EmitOptions
): Promise<StagedJob | void>
async emit<
T,
TInput extends string | EmitData<T>[] = string,
TResult = TInput extends EmitData<T>[] ? StagedJob[] : StagedJob
>(
eventNameOrData: TInput,
data?: T,
options: Record<string, unknown> & EmitOptions = {}
): Promise<TResult | void> {
const globalEventOptions = this.config_?.projectConfig?.event_options ?? {}
const isBulkEmit = !isString(eventNameOrData)
const events = isBulkEmit
? eventNameOrData.map((event) => ({
data: { eventName: event.eventName, data: event.data },
opts: event.opts,
}))
: [
{
data: { eventName: eventNameOrData, data },
opts: options,
},
]
// The order of precedence for job options is:
// 1. local options
// 2. global options
// 3. default options
const opts: EmitOptions = {
removeOnComplete: true,
...globalEventOptions,
...options,
const defaultOptions: EmitOptions = {
attempts: 1, // default
removeOnComplete: true, // default
...globalEventOptions, // global
}
if (typeof options.attempts === "number") {
opts.attempts = options.attempts
if (isDefined(options.backoff)) {
opts.backoff = options.backoff
for (const event of events) {
event.opts = {
...defaultOptions,
...(event.opts ?? {}), // local
}
}
if (typeof options.delay === "number") {
opts.delay = options.delay
}
/**
* If we are in an ongoing transaction, we store the jobs in the database
@@ -261,18 +295,20 @@ export default class EventBusService {
this.stagedJobRepository_
)
const jobToCreate = {
event_name: eventName,
data: data as unknown as Record<string, unknown>,
options: opts,
} as Partial<StagedJob>
const jobsToCreate = events.map((event) => {
return stagedJobRepository.create({
event_name: event.data.eventName,
data: event.data.data,
options: event.opts,
} as DeepPartial<StagedJob>) as QueryDeepPartialEntity<StagedJob>
})
const stagedJobInstance = stagedJobRepository.create(jobToCreate)
const stagedJobs = await stagedJobRepository.insertBulk(jobsToCreate)
return await stagedJobRepository.save(stagedJobInstance)
return (!isBulkEmit ? stagedJobs[0] : stagedJobs) as unknown as TResult
}
this.queue_.add({ eventName, data }, opts)
await this.queue_.addBulk(events)
}
startEnqueuer(): void {
@@ -298,18 +334,21 @@ export default class EventBusService {
)
const jobs = await stagedJobRepo.find(listConfig)
await Promise.all(
jobs.map((job) => {
this.queue_
.add(
{ eventName: job.event_name, data: job.data },
{ jobId: job.id, ...job.options }
)
.then(async () => {
await stagedJobRepo.remove(job)
})
})
)
if (!jobs.length) {
await sleep(3000)
continue
}
const eventsData = jobs.map((job) => {
return {
data: { eventName: job.event_name, data: job.data },
opts: { jobId: job.id, ...job.options },
}
})
await this.queue_.addBulk(eventsData).then(async () => {
return await stagedJobRepo.delete({ id: In(jobs.map((j) => j.id)) })
})
await sleep(3000)
}

View File

@@ -17,14 +17,14 @@ import {
PaymentStatus,
Return,
Swap,
TrackingLink,
TrackingLink
} from "../models"
import { AddressRepository } from "../repositories/address"
import { OrderRepository } from "../repositories/order"
import { FindConfig, QuerySelector, Selector } from "../types/common"
import {
CreateFulfillmentOrder,
FulFillmentItemType,
FulFillmentItemType
} from "../types/fulfillment"
import { TotalsContext, UpdateOrderInput } from "../types/orders"
import { CreateShippingMethodDto } from "../types/shipping-options"
@@ -48,7 +48,7 @@ import {
ShippingOptionService,
ShippingProfileService,
TaxProviderService,
TotalsService,
TotalsService
} from "."
export const ORDER_CART_ALREADY_EXISTS_ERROR = "Order from cart already exists"
@@ -521,10 +521,12 @@ class OrderService extends TransactionBaseService {
)
}
await this.eventBus_.emit(OrderService.Events.COMPLETED, {
id: orderId,
no_notification: order.no_notification,
})
await this.eventBus_
.withTransaction(manager)
.emit(OrderService.Events.COMPLETED, {
id: orderId,
no_notification: order.no_notification,
})
order.status = OrderStatus.COMPLETED
@@ -1396,14 +1398,15 @@ class OrderService extends TransactionBaseService {
const evaluatedNoNotification =
no_notification !== undefined ? no_notification : order.no_notification
const eventBusTx = this.eventBus_.withTransaction(manager)
for (const fulfillment of fulfillments) {
await eventBusTx.emit(OrderService.Events.FULFILLMENT_CREATED, {
const eventsToEmit = fulfillments.map((fulfillment) => ({
eventName: OrderService.Events.FULFILLMENT_CREATED,
data: {
id: orderId,
fulfillment_id: fulfillment.id,
no_notification: evaluatedNoNotification,
})
}
},
}))
await this.eventBus_.withTransaction(manager).emit(eventsToEmit)
return result
})
@@ -1560,11 +1563,13 @@ class OrderService extends TransactionBaseService {
const evaluatedNoNotification =
no_notification !== undefined ? no_notification : order.no_notification
await this.eventBus_.emit(OrderService.Events.REFUND_CREATED, {
id: result.id,
refund_id: refund.id,
no_notification: evaluatedNoNotification,
})
await this.eventBus_
.withTransaction(manager)
.emit(OrderService.Events.REFUND_CREATED, {
id: result.id,
refund_id: refund.id,
no_notification: evaluatedNoNotification,
})
return result
})
}

View File

@@ -275,7 +275,9 @@ class UserService extends TransactionBaseService {
await userRepo.softRemove(user)
await this.eventBus_.emit(UserService.Events.DELETED, { id: user.id })
await this.eventBus_
.withTransaction(manager)
.emit(UserService.Events.DELETED, { id: user.id })
return Promise.resolve()
})

View File

@@ -8,3 +8,4 @@ export * from "./calculate-price-tax-amount"
export * from "./csv-cell-content-formatter"
export * from "./exception-formatter"
export * from "./db-aware-column"
export * from "./row-sql-results-to-entity-transformer"

View File

@@ -0,0 +1,27 @@
import { RelationIdLoader } from "typeorm/query-builder/relation-id/RelationIdLoader"
import { RawSqlResultsToEntityTransformer } from "typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer"
import { QueryBuilder, QueryRunner } from "typeorm"
export async function rowSqlResultsToEntityTransformer<T>(
rows: any[],
queryBuilder: QueryBuilder<T>,
queryRunner: QueryRunner
): Promise<T[]> {
const relationIdLoader = new RelationIdLoader(
queryBuilder.connection,
queryRunner,
queryBuilder.expressionMap.relationIdAttributes
)
const transformer = new RawSqlResultsToEntityTransformer(
queryBuilder.expressionMap,
queryBuilder.connection.driver,
[],
[],
queryRunner
)
return transformer.transform(
rows,
queryBuilder.expressionMap.mainAlias!
) as T[]
}