diff --git a/integration-tests/modules/__tests__/notification/admin/notification.spec.ts b/integration-tests/modules/__tests__/notification/admin/notification.spec.ts index 33de1f9815..0c568bc49f 100644 --- a/integration-tests/modules/__tests__/notification/admin/notification.spec.ts +++ b/integration-tests/modules/__tests__/notification/admin/notification.spec.ts @@ -8,7 +8,7 @@ import { ContainerRegistrationKeys, ModuleRegistrationName, } from "@medusajs/utils" -import { TestEventUtils, medusaIntegrationTestRunner } from "medusa-test-utils" +import { medusaIntegrationTestRunner, TestEventUtils } from "medusa-test-utils" jest.setTimeout(50000) @@ -91,14 +91,18 @@ medusaIntegrationTestRunner({ it("should throw an exception if there is no provider for the channel", async () => { const notification = { to: "test@medusajs.com", + template: "order-created", channel: "sms", } as CreateNotificationDTO const error = await service .createNotifications(notification) .catch((e) => e) + + const [notificationResult] = await service.listNotifications() + expect(error.message).toEqual( - "Could not find a notification provider for channel: sms" + `Could not find a notification provider for channel: sms for notification id ${notificationResult.id}` ) }) diff --git a/packages/core/types/src/notification/common.ts b/packages/core/types/src/notification/common.ts index 251d1aad28..963eb85347 100644 --- a/packages/core/types/src/notification/common.ts +++ b/packages/core/types/src/notification/common.ts @@ -99,6 +99,10 @@ export interface NotificationDTO { * The date and time the notification was created. */ created_at: Date + /** + * The status of the notification + */ + status: "pending" | "success" | "failure" } /** diff --git a/packages/core/utils/src/notification/common.ts b/packages/core/utils/src/notification/common.ts new file mode 100644 index 0000000000..f4d533e906 --- /dev/null +++ b/packages/core/utils/src/notification/common.ts @@ -0,0 +1,5 @@ +export enum NotificationStatus { + PENDING = "pending", + SUCCESS = "success", + FAILURE = "failure", +} diff --git a/packages/core/utils/src/notification/index.ts b/packages/core/utils/src/notification/index.ts index 295cc03deb..0ea6696c00 100644 --- a/packages/core/utils/src/notification/index.ts +++ b/packages/core/utils/src/notification/index.ts @@ -1,2 +1,3 @@ export * from "./abstract-notification-provider" export * from "./events" +export * from "./common" diff --git a/packages/modules/notification/integration-tests/__fixtures__/providers/default-provider.ts b/packages/modules/notification/integration-tests/__fixtures__/providers/default-provider.ts index 829bb20357..17620ee7a5 100644 --- a/packages/modules/notification/integration-tests/__fixtures__/providers/default-provider.ts +++ b/packages/modules/notification/integration-tests/__fixtures__/providers/default-provider.ts @@ -7,6 +7,9 @@ export class NotificationProviderServiceFixtures extends AbstractNotificationPro async send( notification: NotificationTypes.ProviderSendNotificationDTO ): Promise { + if (notification.to === "fail") { + throw new Error("Failed to send notification") + } return { id: "external_id" } } } diff --git a/packages/modules/notification/integration-tests/__tests__/notification-module-service/index.spec.ts b/packages/modules/notification/integration-tests/__tests__/notification-module-service/index.spec.ts index 392ce19f06..a52c1ec200 100644 --- a/packages/modules/notification/integration-tests/__tests__/notification-module-service/index.spec.ts +++ b/packages/modules/notification/integration-tests/__tests__/notification-module-service/index.spec.ts @@ -5,11 +5,11 @@ import { Module, Modules, NotificationEvents, + NotificationStatus, } from "@medusajs/utils" import { MockEventBusService, moduleIntegrationTestRunner, - SuiteOptions, } from "medusa-test-utils" import { resolve } from "path" import { NotificationModuleService } from "@services" @@ -32,10 +32,10 @@ let moduleOptions = { jest.setTimeout(30000) -moduleIntegrationTestRunner({ +moduleIntegrationTestRunner({ moduleName: Modules.NOTIFICATION, moduleOptions, - testSuite: ({ service }: SuiteOptions) => + testSuite: ({ service }) => describe("Notification Module Service", () => { let eventBusEmitSpy @@ -66,7 +66,7 @@ moduleIntegrationTestRunner({ }) }) - it("sends a notification and stores it in the database", async () => { + it("should send a notification and stores it in the database", async () => { const notification = { to: "admin@medusa.com", template: "some-template", @@ -79,11 +79,12 @@ moduleIntegrationTestRunner({ expect.objectContaining({ provider_id: "test-provider", external_id: "external_id", + status: NotificationStatus.SUCCESS, }) ) }) - it("emits an event when a notification is created", async () => { + it("should emit an event when a notification is created", async () => { const notification = { to: "admin@medusa.com", template: "some-template", @@ -109,7 +110,7 @@ moduleIntegrationTestRunner({ ) }) - it("ensures the same notification is not sent twice", async () => { + it("should ensures the same notification is not sent twice", async () => { const notification = { to: "admin@medusa.com", template: "some-template", @@ -123,11 +124,86 @@ moduleIntegrationTestRunner({ expect.objectContaining({ provider_id: "test-provider", external_id: "external_id", + status: NotificationStatus.SUCCESS, }) ) const secondResult = await service.createNotifications(notification) expect(secondResult).toBe(undefined) }) + + it("should manage the status of multiple notification properly in any scenarios", async () => { + const notification1 = { + to: "admin@medusa.com", + template: "some-template", + channel: "email", + data: {}, + idempotency_key: "idempotency-key", + } + + const notification2 = { + to: "0000000000", + template: "some-template", + channel: "sms", + data: {}, + idempotency_key: "idempotency-key-2", + } + + const notification3 = { + to: "admin@medusa.com", + template: "some-template", + channel: "email", + data: {}, + idempotency_key: "idempotency-key-3", + } + + const notification4 = { + to: "fail", + template: "some-template", + channel: "email", + data: {}, + idempotency_key: "idempotency-key-4", + } + + const err = await service + .createNotifications([ + notification1, + notification2, + notification3, + notification4, + ]) + .catch((e) => e) + + const notifications = await service.listNotifications() + + expect(notifications).toHaveLength(4) + + const notification1Result = notifications.find( + (n) => n.idempotency_key === "idempotency-key" + )! + expect(notification1Result.status).toEqual(NotificationStatus.SUCCESS) + + const notification2Result = notifications.find( + (n) => n.idempotency_key === "idempotency-key-2" + )! + expect(notification2Result.status).toEqual(NotificationStatus.FAILURE) + + const notification3Result = notifications.find( + (n) => n.idempotency_key === "idempotency-key-3" + )! + expect(notification3Result.status).toEqual(NotificationStatus.SUCCESS) + + const notification4Result = notifications.find( + (n) => n.idempotency_key === "idempotency-key-4" + )! + expect(notification4Result.status).toEqual(NotificationStatus.FAILURE) + + expect(err).toBeTruthy() + expect(err.message).toEqual( + `Could not find a notification provider for channel: sms for notification id ${notification2Result.id} +Failed to send notification with id ${notification4Result.id}: +Failed to send notification` + ) + }) }), }) diff --git a/packages/modules/notification/src/migrations/.snapshot-medusa-notification.json b/packages/modules/notification/src/migrations/.snapshot-medusa-notification.json index d3338923df..57b5f9fd6c 100644 --- a/packages/modules/notification/src/migrations/.snapshot-medusa-notification.json +++ b/packages/modules/notification/src/migrations/.snapshot-medusa-notification.json @@ -212,6 +212,21 @@ "nullable": true, "mappedType": "text" }, + "status": { + "name": "status", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "default": "'pending'", + "enumItems": [ + "pending", + "success", + "failure" + ], + "mappedType": "enum" + }, "provider_id": { "name": "provider_id", "type": "text", diff --git a/packages/modules/notification/src/migrations/Migration20240830094712.ts b/packages/modules/notification/src/migrations/Migration20240830094712.ts new file mode 100644 index 0000000000..240579cacf --- /dev/null +++ b/packages/modules/notification/src/migrations/Migration20240830094712.ts @@ -0,0 +1,13 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240830094712 extends Migration { + + async up(): Promise { + this.addSql('alter table if exists "notification" add column if not exists "status" text check ("status" in (\'pending\', \'success\', \'failure\')) not null default \'pending\';'); + } + + async down(): Promise { + this.addSql('alter table if exists "notification" drop column if exists "status";'); + } + +} diff --git a/packages/modules/notification/src/models/notification.ts b/packages/modules/notification/src/models/notification.ts index a03294d41e..451c564011 100644 --- a/packages/modules/notification/src/models/notification.ts +++ b/packages/modules/notification/src/models/notification.ts @@ -1,4 +1,4 @@ -import { model } from "@medusajs/utils" +import { model, NotificationStatus } from "@medusajs/utils" import { NotificationProvider } from "./notification-provider" // We probably want to have a TTL for each entry, so we don't bloat the DB (and also for GDPR reasons if TTL < 30 days). @@ -24,6 +24,9 @@ export const Notification = model.define("notification", { idempotency_key: model.text().unique().nullable(), // The ID of the notification in the external system, if applicable external_id: model.text().nullable(), + // The status of the notification + status: model.enum(NotificationStatus).default(NotificationStatus.PENDING), + provider: model .belongsTo(() => NotificationProvider, { mappedBy: "notifications" }) .nullable(), diff --git a/packages/modules/notification/src/services/notification-module-service.ts b/packages/modules/notification/src/services/notification-module-service.ts index 39bb5e05e7..a0f3278819 100644 --- a/packages/modules/notification/src/services/notification-module-service.ts +++ b/packages/modules/notification/src/services/notification-module-service.ts @@ -9,11 +9,12 @@ import { } from "@medusajs/types" import { EmitEvents, + generateEntityId, InjectManager, - InjectTransactionManager, MedusaContext, MedusaError, MedusaService, + NotificationStatus, promiseAll, } from "@medusajs/utils" import { Notification } from "@models" @@ -94,7 +95,7 @@ export default class NotificationModuleService return Array.isArray(data) ? serialized : serialized[0] } - @InjectTransactionManager("baseRepository_") + @InjectManager("baseRepository_") protected async createNotifications_( data: NotificationTypes.CreateNotificationDTO[], @MedusaContext() sharedContext: Context = {} @@ -108,57 +109,128 @@ export default class NotificationModuleService const idempotencyKeys = data .map((entry) => entry.idempotency_key) .filter(Boolean) - const alreadySentNotifications = await this.notificationService_.list( - { - idempotency_key: idempotencyKeys, - }, - { take: null }, - sharedContext - ) - const existsMap = new Map( - alreadySentNotifications.map((n) => [n.idempotency_key as string, true]) - ) - - const notificationsToProcess = data.filter( - (entry) => !entry.idempotency_key || !existsMap.has(entry.idempotency_key) - ) - - const notificationsToCreate = await promiseAll( - notificationsToProcess.map(async (entry) => { - const provider = - await this.notificationProviderService_.getProviderForChannel( - entry.channel - ) - - if (!provider) { - throw new MedusaError( - MedusaError.Types.NOT_FOUND, - `Could not find a notification provider for channel: ${entry.channel}` - ) + let { notificationsToProcess, createdNotifications } = + await this.baseRepository_.transaction(async (txManager) => { + const context = { + ...sharedContext, + transactionManager: txManager, } - if (!provider.is_enabled) { - throw new MedusaError( - MedusaError.Types.NOT_FOUND, - `Notification provider ${provider.id} is not enabled. To enable it, configure it as a provider in the notification module options.` - ) - } - - const res = await this.notificationProviderService_.send( - provider, - entry + const alreadySentNotifications = await this.notificationService_.list( + { + idempotency_key: idempotencyKeys, + }, + { take: null }, + context ) - return { ...entry, provider_id: provider.id, external_id: res.id } - }) - ) - // Currently we store notifications after they are sent, which might result in a notification being sent that is not registered in the database. - // If necessary, we can switch to a two-step process where we first create the notification, send it, and update it after it being sent. - const createdNotifications = await this.notificationService_.create( - notificationsToCreate, - sharedContext - ) + const existsMap = new Map( + alreadySentNotifications.map((n) => [n.idempotency_key as string, n]) + ) + + const notificationsToProcess = data.filter( + (entry) => + !entry.idempotency_key || + !existsMap.has(entry.idempotency_key) || + (existsMap.has(entry.idempotency_key) && + existsMap.get(entry.idempotency_key)!.status === + NotificationStatus.FAILURE) + ) + + const channels = notificationsToProcess.map((not) => not.channel) + const providers = + await this.notificationProviderService_.getProviderForChannels( + channels + ) + + // Create the notifications to be sent to prevent concurrent actions listing the same notifications + const normalizedNotificationsToProcess = notificationsToProcess.map( + (entry) => { + const provider = providers.find((provider) => + provider?.channels.includes(entry.channel) + ) + + return { + provider, + data: { + id: generateEntityId(undefined, "noti"), + ...entry, + provider_id: provider?.id, + }, + } + } + ) + + const toCreate = normalizedNotificationsToProcess + .filter( + (e) => + !e.data.idempotency_key || !existsMap.has(e.data.idempotency_key) + ) + .map((e) => e.data) + + const createdNotifications = toCreate.length + ? await this.notificationService_.create(toCreate, context) + : [] + + return { + notificationsToProcess: normalizedNotificationsToProcess, + createdNotifications, + } + }) + + const notificationToUpdate: { id: string; external_id?: string }[] = [] + + try { + await promiseAll( + notificationsToProcess.map(async (entry) => { + const provider = entry.provider + + if (!provider?.is_enabled) { + entry.data.status = NotificationStatus.FAILURE + notificationToUpdate.push(entry.data) + + const errorMessage = !provider + ? `Could not find a notification provider for channel: ${entry.data.channel} for notification id ${entry.data.id}` + : `Notification provider ${provider.id} is not enabled. To enable it, configure it as a provider in the notification module options.` + + throw new MedusaError(MedusaError.Types.NOT_FOUND, errorMessage) + } + + const res = await this.notificationProviderService_ + .send(provider, entry.data) + .catch((e) => { + entry.data.status = NotificationStatus.FAILURE + notificationToUpdate.push(entry.data) + throw new MedusaError( + MedusaError.Types.UNEXPECTED_STATE, + `Failed to send notification with id ${entry.data.id}:\n${e.message}` + ) + }) + + entry.data.external_id = res.id + entry.data.status = NotificationStatus.SUCCESS + + notificationToUpdate.push(entry.data) + }), + { + aggregateErrors: true, + } + ) + } finally { + const updatedNotifications = await this.notificationService_.update( + notificationToUpdate, + sharedContext + ) + const updatedNotificationsMap = new Map( + updatedNotifications.map((n) => [n.id, n]) + ) + + // Maintain the order of the notifications + createdNotifications = createdNotifications.map((notification) => { + return updatedNotificationsMap.get(notification.id) || notification + }) + } return createdNotifications } diff --git a/packages/modules/notification/src/services/notification-provider.ts b/packages/modules/notification/src/services/notification-provider.ts index 9ce377e5b1..f1d39aec2a 100644 --- a/packages/modules/notification/src/services/notification-provider.ts +++ b/packages/modules/notification/src/services/notification-provider.ts @@ -12,6 +12,8 @@ type InjectedDependencies = { ]: NotificationTypes.INotificationProvider } +type Provider = InferEntityType + export default class NotificationProviderService extends ModulesSdkUtils.MedusaInternalService< InjectedDependencies, typeof NotificationProvider @@ -46,13 +48,13 @@ export default class NotificationProviderService extends ModulesSdkUtils.MedusaI } } - async getProviderForChannel( - channel: string - ): Promise | undefined> { + async getProviderForChannels< + TChannel = string | string[], + TOutput = TChannel extends string[] ? Provider[] : Provider | undefined + >(channels: TChannel): Promise { if (!this.providersCache) { const providers = await this.notificationProviderRepository_.find() - type name = (typeof NotificationProvider)["name"] this.providersCache = new Map( providers.flatMap((provider) => provider.channels.map((c) => [c, provider]) @@ -60,7 +62,12 @@ export default class NotificationProviderService extends ModulesSdkUtils.MedusaI ) } - return this.providersCache.get(channel) + const normalizedChannels = Array.isArray(channels) ? channels : [channels] + const results = normalizedChannels + .map((channel) => this.providersCache.get(channel)) + .filter(Boolean) + + return (Array.isArray(channels) ? results : results[0]) as TOutput } async send(