From f0a1355feb5160ff7de1f3d3da769efe29d0d8ed Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Mon, 13 Mar 2023 15:28:51 +0100 Subject: [PATCH] feat(medusa): Bulk emit events (#3407) --- .changeset/serious-avocados-rush.md | 7 + .../plugins/__tests__/inventory/service.js | 2 +- .../src/services/reservation-item.ts | 15 +- .../medusa-test-utils/src/mock-repository.js | 9 + .../medusa/src/repositories/staged-job.ts | 27 +- .../src/services/__tests__/event-bus.js | 284 +++++++++++++----- .../medusa/src/services/__tests__/order.js | 24 +- packages/medusa/src/services/claim.ts | 14 +- packages/medusa/src/services/currency.ts | 8 +- packages/medusa/src/services/event-bus.ts | 109 ++++--- packages/medusa/src/services/order.ts | 39 +-- packages/medusa/src/services/user.ts | 4 +- packages/medusa/src/utils/index.ts | 1 + .../row-sql-results-to-entity-transformer.ts | 27 ++ 14 files changed, 404 insertions(+), 166 deletions(-) create mode 100644 .changeset/serious-avocados-rush.md create mode 100644 packages/medusa/src/utils/row-sql-results-to-entity-transformer.ts diff --git a/.changeset/serious-avocados-rush.md b/.changeset/serious-avocados-rush.md new file mode 100644 index 0000000000..f0ce7eaec9 --- /dev/null +++ b/.changeset/serious-avocados-rush.md @@ -0,0 +1,7 @@ +--- +"@medusajs/inventory": patch +"medusa-test-utils": patch +"@medusajs/medusa": patch +--- + +feat(medusa): Bulk emit events diff --git a/integration-tests/plugins/__tests__/inventory/service.js b/integration-tests/plugins/__tests__/inventory/service.js index 287b271b28..c32faa9ab7 100644 --- a/integration-tests/plugins/__tests__/inventory/service.js +++ b/integration-tests/plugins/__tests__/inventory/service.js @@ -3,7 +3,7 @@ const path = require("path") const { bootstrapApp } = require("../../../helpers/bootstrap-app") const { initDb, useDb } = require("../../../helpers/use-db") -jest.setTimeout(30000) +jest.setTimeout(50000) describe("Inventory Module", () => { let appContainer diff --git a/packages/inventory/src/services/reservation-item.ts b/packages/inventory/src/services/reservation-item.ts index 314821b8e8..5f5d67d43c 100644 --- a/packages/inventory/src/services/reservation-item.ts +++ b/packages/inventory/src/services/reservation-item.ts @@ -1,17 +1,16 @@ import { EntityManager } from "typeorm" import { isDefined, MedusaError } from "medusa-core-utils" import { - FindConfig, buildQuery, - IEventBusService, - FilterableReservationItemProps, CreateReservationItemInput, + FilterableReservationItemProps, + FindConfig, + IEventBusService, TransactionBaseService, UpdateReservationItemInput, } from "@medusajs/medusa" import { ReservationItem } from "../models" -import { CONNECTION_NAME } from "../config" import { InventoryLevelService } from "." type InjectedDependencies = { @@ -278,10 +277,12 @@ export default class ReservationItemService extends TransactionBaseService { item.quantity * -1 ), ]) - }) - await this.eventBusService_.emit(ReservationItemService.Events.DELETED, { - id: reservationItemId, + await this.eventBusService_ + .withTransaction(manager) + .emit(ReservationItemService.Events.DELETED, { + id: reservationItemId, + }) }) } } diff --git a/packages/medusa-test-utils/src/mock-repository.js b/packages/medusa-test-utils/src/mock-repository.js index f0c88056ac..2871f4497a 100644 --- a/packages/medusa-test-utils/src/mock-repository.js +++ b/packages/medusa-test-utils/src/mock-repository.js @@ -12,6 +12,8 @@ class MockRepo { save, findAndCount, del, + count, + insertBulk }) { this.create_ = create; this.update_ = update; @@ -25,12 +27,19 @@ class MockRepo { this.save_ = save; this.findAndCount_ = findAndCount; this.findOneWithRelations_ = findOneWithRelations; + this.insertBulk_ = insertBulk; } setFindOne(fn) { this.findOne_ = fn; } + insertBulk = jest.fn().mockImplementation((...args) => { + if (this.insertBulk_) { + return this.insertBulk_(...args) + } + return {} + }) create = jest.fn().mockImplementation((...args) => { if (this.create_) { return this.create_(...args); diff --git a/packages/medusa/src/repositories/staged-job.ts b/packages/medusa/src/repositories/staged-job.ts index 0323ab29a3..4db1ee7118 100644 --- a/packages/medusa/src/repositories/staged-job.ts +++ b/packages/medusa/src/repositories/staged-job.ts @@ -1,5 +1,28 @@ import { EntityRepository, Repository } from "typeorm" -import { StagedJob } from "../models/staged-job" +import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity" +import { StagedJob } from "../models" +import { rowSqlResultsToEntityTransformer } from "../utils" @EntityRepository(StagedJob) -export class StagedJobRepository extends Repository {} +export class StagedJobRepository extends Repository { + async insertBulk(jobToCreates: QueryDeepPartialEntity[]) { + const queryBuilder = this.createQueryBuilder() + .insert() + .into(StagedJob) + .values(jobToCreates) + + // TODO: remove if statement once this issue is resolved https://github.com/typeorm/typeorm/issues/9850 + if (!queryBuilder.connection.driver.isReturningSqlSupported("insert")) { + const rawStagedJobs = await queryBuilder.execute() + return rawStagedJobs.generatedMaps + } + + const rawStagedJobs = await queryBuilder.returning("*").execute() + + return rowSqlResultsToEntityTransformer( + rawStagedJobs.raw, + queryBuilder, + this.queryRunner! + ) + } +} diff --git a/packages/medusa/src/services/__tests__/event-bus.js b/packages/medusa/src/services/__tests__/event-bus.js index 0532cc8cba..97ac67ea98 100644 --- a/packages/medusa/src/services/__tests__/event-bus.js +++ b/packages/medusa/src/services/__tests__/event-bus.js @@ -114,79 +114,174 @@ describe("EventBusService", () => { }) describe("emit", () => { - let eventBus - let job + const eventName = "eventName" + const defaultOptions = { + attempts: 1, + removeOnComplete: true, + } + + const data = { hi: "1234" } + const bulkData = [{ hi: "1234" }, { hi: "12345" }] + + const mockManager = MockManager + describe("successfully adds job to queue", () => { - beforeAll(() => { - jest.resetAllMocks() - const stagedJobRepository = MockRepository({ - find: () => Promise.resolve([]), + let eventBus + let stagedJobRepository + + beforeEach(() => { + stagedJobRepository = MockRepository({ + insertBulk: async (data) => data, + create: (data) => data, }) eventBus = new EventBusService({ logger: loggerMock, - manager: MockManager, + manager: mockManager, stagedJobRepository, }) - eventBus.queue_.add.mockImplementationOnce(() => "hi") - - job = eventBus.emit("eventName", { hi: "1234" }) + eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") }) - afterAll(async () => { + + afterEach(async () => { await eventBus.stopEnqueuer() + jest.clearAllMocks() }) - it("calls queue.add", () => { - expect(eventBus.queue_.add).toHaveBeenCalled() + it("calls queue.addBulk", async () => { + await eventBus.emit(eventName, data) + + expect(eventBus.queue_.addBulk).toHaveBeenCalled() + expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ + { + data: { + data, + eventName, + }, + opts: defaultOptions, + }, + ]) + }) + + it("calls stagedJob repository insertBulk", async () => { + await eventBus.withTransaction(mockManager).emit(eventName, data) + + expect(stagedJobRepository.create).toHaveBeenCalled() + expect(stagedJobRepository.create).toHaveBeenCalledWith({ + event_name: eventName, + data: data, + options: defaultOptions, + }) + + expect(stagedJobRepository.insertBulk).toHaveBeenCalled() + expect(stagedJobRepository.insertBulk).toHaveBeenCalledWith([ + { + event_name: eventName, + data, + options: defaultOptions, + }, + ]) }) }) - describe("successfully adds job to queue with local options", () => { - beforeAll(() => { - jest.resetAllMocks() - const stagedJobRepository = MockRepository({ - find: () => Promise.resolve([]), + describe("successfully adds jobs in bulk to queue", () => { + let eventBus + let stagedJobRepository + + beforeEach(() => { + stagedJobRepository = MockRepository({ + insertBulk: async (data) => data, + create: (data) => data, }) eventBus = new EventBusService({ logger: loggerMock, - manager: MockManager, + manager: mockManager, stagedJobRepository, }) - eventBus.queue_.add.mockImplementationOnce(() => "hi") - - job = eventBus.emit( - "eventName", - { hi: "1234" }, - { removeOnComplete: 100 } - ) + eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") }) - afterAll(async () => { + + afterEach(async () => { + jest.clearAllMocks() await eventBus.stopEnqueuer() }) - it("calls queue.add", () => { - expect(eventBus.queue_.add).toHaveBeenCalled() - expect(eventBus.queue_.add).toHaveBeenCalledWith( - { eventName: "eventName", data: { hi: "1234" } }, - { removeOnComplete: 100 } - ) + it("calls queue.addBulk", async () => { + await eventBus.emit([ + { eventName, data: bulkData[0] }, + { eventName, data: bulkData[1] }, + ]) + + expect(eventBus.queue_.addBulk).toHaveBeenCalledTimes(1) + expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ + { + data: { + data: bulkData[0], + eventName, + }, + opts: defaultOptions, + }, + { + data: { + data: bulkData[1], + eventName, + }, + opts: defaultOptions, + }, + ]) + }) + + it("calls stagedJob repository insertBulk", async () => { + await eventBus.withTransaction(mockManager).emit([ + { eventName, data: bulkData[0] }, + { eventName, data: bulkData[1] }, + ]) + + expect(stagedJobRepository.create).toHaveBeenCalledTimes(2) + expect(stagedJobRepository.create).toHaveBeenNthCalledWith(1, { + data: bulkData[0], + event_name: eventName, + options: defaultOptions, + }) + expect(stagedJobRepository.create).toHaveBeenNthCalledWith(2, { + data: bulkData[1], + event_name: eventName, + options: defaultOptions, + }) + + expect(stagedJobRepository.insertBulk).toHaveBeenCalledTimes(1) + expect(stagedJobRepository.insertBulk).toHaveBeenCalledWith([ + { + data: bulkData[0], + event_name: eventName, + options: defaultOptions, + }, + { + data: bulkData[1], + event_name: eventName, + options: defaultOptions, + }, + ]) }) }) describe("successfully adds job to queue with global options", () => { - beforeAll(() => { - jest.resetAllMocks() - const stagedJobRepository = MockRepository({ - find: () => Promise.resolve([]), + let eventBus + let stagedJobRepository + + beforeEach(() => { + stagedJobRepository = MockRepository({ + insertBulk: async (data) => data, + create: (data) => data, }) eventBus = new EventBusService( { logger: loggerMock, - manager: MockManager, + manager: mockManager, stagedJobRepository, }, { @@ -194,58 +289,78 @@ describe("EventBusService", () => { } ) - eventBus.queue_.add.mockImplementationOnce(() => "hi") + eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") - job = eventBus.emit("eventName", { hi: "1234" }) + eventBus.emit(eventName, data) }) - afterAll(async () => { + + afterEach(async () => { + jest.clearAllMocks() await eventBus.stopEnqueuer() }) - it("calls queue.add", () => { - expect(eventBus.queue_.add).toHaveBeenCalled() - expect(eventBus.queue_.add).toHaveBeenCalledWith( - { eventName: "eventName", data: { hi: "1234" } }, - { removeOnComplete: 10, attempts: 1 } - ) + it("calls queue.addBulk", () => { + expect(eventBus.queue_.addBulk).toHaveBeenCalled() + expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ + { + data: { + data, + eventName, + }, + opts: { removeOnComplete: 10, attempts: 1 }, + }, + ]) }) }) describe("successfully adds job to queue with default options", () => { - beforeAll(() => { - jest.resetAllMocks() - const stagedJobRepository = MockRepository({ - find: () => Promise.resolve([]), + let eventBus + let stagedJobRepository + + beforeEach(() => { + stagedJobRepository = MockRepository({ + insertBulk: async (data) => data, + create: (data) => data, }) eventBus = new EventBusService({ logger: loggerMock, - manager: MockManager, + manager: mockManager, stagedJobRepository, }) - eventBus.queue_.add.mockImplementationOnce(() => "hi") + eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") - job = eventBus.emit("eventName", { hi: "1234" }) + eventBus.emit(eventName, data) }) - afterAll(async () => { + + afterEach(async () => { + jest.clearAllMocks() await eventBus.stopEnqueuer() }) - it("calls queue.add", () => { - expect(eventBus.queue_.add).toHaveBeenCalled() - expect(eventBus.queue_.add).toHaveBeenCalledWith( - { eventName: "eventName", data: { hi: "1234" } }, - { removeOnComplete: true, attempts: 1 } - ) + it("calls queue.addBulk", () => { + expect(eventBus.queue_.addBulk).toHaveBeenCalled() + expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ + { + data: { + data, + eventName, + }, + opts: { removeOnComplete: true, attempts: 1 }, + }, + ]) }) }) describe("successfully adds job to queue with local options and global options merged", () => { - beforeAll(() => { - jest.resetAllMocks() - const stagedJobRepository = MockRepository({ - find: () => Promise.resolve([]), + let eventBus + let stagedJobRepository + + beforeEach(() => { + stagedJobRepository = MockRepository({ + insertBulk: async (data) => data, + create: (data) => data, }) eventBus = new EventBusService( @@ -259,29 +374,36 @@ describe("EventBusService", () => { } ) - eventBus.queue_.add.mockImplementationOnce(() => "hi") + eventBus.queue_.addBulk.mockImplementationOnce(() => "hi") - job = eventBus.emit( - "eventName", - { hi: "1234" }, - { attempts: 10, delay: 1000, backoff: { type: "exponential" } } - ) + eventBus.emit(eventName, data, { + attempts: 10, + delay: 1000, + backoff: { type: "exponential" }, + }) }) - afterAll(async () => { + + afterEach(async () => { + jest.clearAllMocks() await eventBus.stopEnqueuer() }) it("calls queue.add", () => { - expect(eventBus.queue_.add).toHaveBeenCalled() - expect(eventBus.queue_.add).toHaveBeenCalledWith( - { eventName: "eventName", data: { hi: "1234" } }, + expect(eventBus.queue_.addBulk).toHaveBeenCalled() + expect(eventBus.queue_.addBulk).toHaveBeenCalledWith([ { - removeOnComplete: 10, // global option - attempts: 10, // local option - delay: 1000, // local option - backoff: { type: "exponential" }, // local option - } - ) + data: { + data, + eventName, + }, + opts: { + removeOnComplete: 10, // global option + attempts: 10, // local option + delay: 1000, // local option + backoff: { type: "exponential" }, // local option + }, + }, + ]) }) }) }) diff --git a/packages/medusa/src/services/__tests__/order.js b/packages/medusa/src/services/__tests__/order.js index 15b27b21d0..c02623bc94 100644 --- a/packages/medusa/src/services/__tests__/order.js +++ b/packages/medusa/src/services/__tests__/order.js @@ -1,8 +1,8 @@ import { IdMap, MockManager, MockRepository } from "medusa-test-utils" import OrderService from "../order" -import { ProductVariantInventoryServiceMock } from "../__mocks__/product-variant-inventory" import { LineItemServiceMock } from "../__mocks__/line-item" import { newTotalsServiceMock } from "../__mocks__/new-totals" +import { ProductVariantInventoryServiceMock } from "../__mocks__/product-variant-inventory" import { taxProviderServiceMock } from "../__mocks__/tax-provider" describe("OrderService", () => { @@ -1072,10 +1072,15 @@ describe("OrderService", () => { { no_notification: input } ) - expect(eventBusService.emit).toHaveBeenCalledWith(expect.any(String), { - id: expect.any(String), - no_notification: expected, - }) + expect(eventBusService.emit).toHaveBeenCalledWith([ + { + eventName: expect.any(String), + data: { + id: expect.any(String), + no_notification: expected, + }, + }, + ]) } ) }) @@ -1246,17 +1251,10 @@ describe("OrderService", () => { save: jest.fn().mockImplementation((f) => f), }) - const eventBus = { - emit: () => - Promise.resolve({ - finished: () => Promise.resolve({}), - }), - } - const orderService = new OrderService({ manager: MockManager, orderRepository: orderRepo, - eventBusService: eventBus, + eventBusService: eventBusService, }) beforeEach(async () => { diff --git a/packages/medusa/src/services/claim.ts b/packages/medusa/src/services/claim.ts index ed02d2fb62..6f0c07ecb6 100644 --- a/packages/medusa/src/services/claim.ts +++ b/packages/medusa/src/services/claim.ts @@ -638,15 +638,17 @@ export default class ClaimService extends TransactionBaseService { ) const claimOrder = await claimRepo.save(claim) - const eventBusTx = this.eventBus_.withTransaction(transactionManager) - - for (const fulfillment of fulfillments) { - await eventBusTx.emit(ClaimService.Events.FULFILLMENT_CREATED, { + const eventsToEmit = fulfillments.map((fulfillment) => ({ + eventName: ClaimService.Events.FULFILLMENT_CREATED, + data: { id: id, fulfillment_id: fulfillment.id, no_notification: claim.no_notification, - }) - } + }, + })) + await this.eventBus_ + .withTransaction(transactionManager) + .emit(eventsToEmit) return claimOrder } diff --git a/packages/medusa/src/services/currency.ts b/packages/medusa/src/services/currency.ts index 7a72474091..c147ed9b02 100644 --- a/packages/medusa/src/services/currency.ts +++ b/packages/medusa/src/services/currency.ts @@ -122,9 +122,11 @@ export default class CurrencyService extends TransactionBaseService { ) await currencyRepo.save(currency) - await this.eventBusService_.emit(CurrencyService.Events.UPDATED, { - code, - }) + await this.eventBusService_ + .withTransaction(transactionManager) + .emit(CurrencyService.Events.UPDATED, { + code, + }) return currency }) diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index 5549b1e36c..404c0e5a03 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -1,11 +1,12 @@ import Bull, { JobOptions } from "bull" import Redis from "ioredis" -import { isDefined } from "medusa-core-utils" -import { EntityManager } from "typeorm" +import { DeepPartial, EntityManager, In } from "typeorm" +import { QueryDeepPartialEntity } from "typeorm/query-builder/QueryPartialEntity" import { ulid } from "ulid" import { StagedJob } from "../models" import { StagedJobRepository } from "../repositories/staged-job" import { ConfigModule, Logger } from "../types/global" +import { isString } from "../utils" import { sleep } from "../utils/sleep" import JobSchedulerService, { CreateJobOptions } from "./job-scheduler" @@ -49,6 +50,12 @@ export type EmitOptions = { } } & JobOptions +export type EmitData = { + eventName: string + data: T + opts?: Record & EmitOptions +} + /** * Can keep track of multiple subscribers to different events and run the * subscribers when events happen. Events will run asynchronously. @@ -213,6 +220,13 @@ export default class EventBusService { return this } + /** + * Calls all subscribers when an event occurs. + * @param data - The data to use to process the events + * @return the jobs from our queue + */ + async emit(data: EmitData[]): Promise + /** * Calls all subscribers when an event occurs. * @param {string} eventName - the name of the event to be process. @@ -223,29 +237,49 @@ export default class EventBusService { async emit( eventName: string, data: T, - options: Record & EmitOptions = { attempts: 1 } - ): Promise { + options?: Record & EmitOptions + ): Promise + + async emit< + T, + TInput extends string | EmitData[] = string, + TResult = TInput extends EmitData[] ? StagedJob[] : StagedJob + >( + eventNameOrData: TInput, + data?: T, + options: Record & EmitOptions = {} + ): Promise { const globalEventOptions = this.config_?.projectConfig?.event_options ?? {} + const isBulkEmit = !isString(eventNameOrData) + const events = isBulkEmit + ? eventNameOrData.map((event) => ({ + data: { eventName: event.eventName, data: event.data }, + opts: event.opts, + })) + : [ + { + data: { eventName: eventNameOrData, data }, + opts: options, + }, + ] + // The order of precedence for job options is: // 1. local options // 2. global options // 3. default options - const opts: EmitOptions = { - removeOnComplete: true, - ...globalEventOptions, - ...options, + const defaultOptions: EmitOptions = { + attempts: 1, // default + removeOnComplete: true, // default + ...globalEventOptions, // global } - if (typeof options.attempts === "number") { - opts.attempts = options.attempts - if (isDefined(options.backoff)) { - opts.backoff = options.backoff + for (const event of events) { + event.opts = { + ...defaultOptions, + ...(event.opts ?? {}), // local } } - if (typeof options.delay === "number") { - opts.delay = options.delay - } /** * If we are in an ongoing transaction, we store the jobs in the database @@ -261,18 +295,20 @@ export default class EventBusService { this.stagedJobRepository_ ) - const jobToCreate = { - event_name: eventName, - data: data as unknown as Record, - options: opts, - } as Partial + const jobsToCreate = events.map((event) => { + return stagedJobRepository.create({ + event_name: event.data.eventName, + data: event.data.data, + options: event.opts, + } as DeepPartial) as QueryDeepPartialEntity + }) - const stagedJobInstance = stagedJobRepository.create(jobToCreate) + const stagedJobs = await stagedJobRepository.insertBulk(jobsToCreate) - return await stagedJobRepository.save(stagedJobInstance) + return (!isBulkEmit ? stagedJobs[0] : stagedJobs) as unknown as TResult } - this.queue_.add({ eventName, data }, opts) + await this.queue_.addBulk(events) } startEnqueuer(): void { @@ -298,18 +334,21 @@ export default class EventBusService { ) const jobs = await stagedJobRepo.find(listConfig) - await Promise.all( - jobs.map((job) => { - this.queue_ - .add( - { eventName: job.event_name, data: job.data }, - { jobId: job.id, ...job.options } - ) - .then(async () => { - await stagedJobRepo.remove(job) - }) - }) - ) + if (!jobs.length) { + await sleep(3000) + continue + } + + const eventsData = jobs.map((job) => { + return { + data: { eventName: job.event_name, data: job.data }, + opts: { jobId: job.id, ...job.options }, + } + }) + + await this.queue_.addBulk(eventsData).then(async () => { + return await stagedJobRepo.delete({ id: In(jobs.map((j) => j.id)) }) + }) await sleep(3000) } diff --git a/packages/medusa/src/services/order.ts b/packages/medusa/src/services/order.ts index 165c48d153..7490ed7c93 100644 --- a/packages/medusa/src/services/order.ts +++ b/packages/medusa/src/services/order.ts @@ -17,14 +17,14 @@ import { PaymentStatus, Return, Swap, - TrackingLink, + TrackingLink } from "../models" import { AddressRepository } from "../repositories/address" import { OrderRepository } from "../repositories/order" import { FindConfig, QuerySelector, Selector } from "../types/common" import { CreateFulfillmentOrder, - FulFillmentItemType, + FulFillmentItemType } from "../types/fulfillment" import { TotalsContext, UpdateOrderInput } from "../types/orders" import { CreateShippingMethodDto } from "../types/shipping-options" @@ -48,7 +48,7 @@ import { ShippingOptionService, ShippingProfileService, TaxProviderService, - TotalsService, + TotalsService } from "." export const ORDER_CART_ALREADY_EXISTS_ERROR = "Order from cart already exists" @@ -521,10 +521,12 @@ class OrderService extends TransactionBaseService { ) } - await this.eventBus_.emit(OrderService.Events.COMPLETED, { - id: orderId, - no_notification: order.no_notification, - }) + await this.eventBus_ + .withTransaction(manager) + .emit(OrderService.Events.COMPLETED, { + id: orderId, + no_notification: order.no_notification, + }) order.status = OrderStatus.COMPLETED @@ -1396,14 +1398,15 @@ class OrderService extends TransactionBaseService { const evaluatedNoNotification = no_notification !== undefined ? no_notification : order.no_notification - const eventBusTx = this.eventBus_.withTransaction(manager) - for (const fulfillment of fulfillments) { - await eventBusTx.emit(OrderService.Events.FULFILLMENT_CREATED, { + const eventsToEmit = fulfillments.map((fulfillment) => ({ + eventName: OrderService.Events.FULFILLMENT_CREATED, + data: { id: orderId, fulfillment_id: fulfillment.id, no_notification: evaluatedNoNotification, - }) - } + }, + })) + await this.eventBus_.withTransaction(manager).emit(eventsToEmit) return result }) @@ -1560,11 +1563,13 @@ class OrderService extends TransactionBaseService { const evaluatedNoNotification = no_notification !== undefined ? no_notification : order.no_notification - await this.eventBus_.emit(OrderService.Events.REFUND_CREATED, { - id: result.id, - refund_id: refund.id, - no_notification: evaluatedNoNotification, - }) + await this.eventBus_ + .withTransaction(manager) + .emit(OrderService.Events.REFUND_CREATED, { + id: result.id, + refund_id: refund.id, + no_notification: evaluatedNoNotification, + }) return result }) } diff --git a/packages/medusa/src/services/user.ts b/packages/medusa/src/services/user.ts index eda257f93a..905d2a2b0b 100644 --- a/packages/medusa/src/services/user.ts +++ b/packages/medusa/src/services/user.ts @@ -275,7 +275,9 @@ class UserService extends TransactionBaseService { await userRepo.softRemove(user) - await this.eventBus_.emit(UserService.Events.DELETED, { id: user.id }) + await this.eventBus_ + .withTransaction(manager) + .emit(UserService.Events.DELETED, { id: user.id }) return Promise.resolve() }) diff --git a/packages/medusa/src/utils/index.ts b/packages/medusa/src/utils/index.ts index f4d5653093..d78a7356c7 100644 --- a/packages/medusa/src/utils/index.ts +++ b/packages/medusa/src/utils/index.ts @@ -8,3 +8,4 @@ export * from "./calculate-price-tax-amount" export * from "./csv-cell-content-formatter" export * from "./exception-formatter" export * from "./db-aware-column" +export * from "./row-sql-results-to-entity-transformer" diff --git a/packages/medusa/src/utils/row-sql-results-to-entity-transformer.ts b/packages/medusa/src/utils/row-sql-results-to-entity-transformer.ts new file mode 100644 index 0000000000..a91e5ed46e --- /dev/null +++ b/packages/medusa/src/utils/row-sql-results-to-entity-transformer.ts @@ -0,0 +1,27 @@ +import { RelationIdLoader } from "typeorm/query-builder/relation-id/RelationIdLoader" +import { RawSqlResultsToEntityTransformer } from "typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer" +import { QueryBuilder, QueryRunner } from "typeorm" + +export async function rowSqlResultsToEntityTransformer( + rows: any[], + queryBuilder: QueryBuilder, + queryRunner: QueryRunner +): Promise { + const relationIdLoader = new RelationIdLoader( + queryBuilder.connection, + queryRunner, + queryBuilder.expressionMap.relationIdAttributes + ) + const transformer = new RawSqlResultsToEntityTransformer( + queryBuilder.expressionMap, + queryBuilder.connection.driver, + [], + [], + queryRunner + ) + + return transformer.transform( + rows, + queryBuilder.expressionMap.mainAlias! + ) as T[] +}