feat(notification): Handle long running transaction and add status support (#8900)
RESOLVES FRMW-2110 RESOLVES FRMW-2095 **What** - Fix notification module to not retain transactions open while sending notification - Add support for notification status [success, pending, failure]
This commit is contained in:
committed by
GitHub
parent
dbb10ff051
commit
99461e24ab
@@ -8,7 +8,7 @@ import {
|
||||
ContainerRegistrationKeys,
|
||||
ModuleRegistrationName,
|
||||
} from "@medusajs/utils"
|
||||
import { TestEventUtils, medusaIntegrationTestRunner } from "medusa-test-utils"
|
||||
import { medusaIntegrationTestRunner, TestEventUtils } from "medusa-test-utils"
|
||||
|
||||
jest.setTimeout(50000)
|
||||
|
||||
@@ -91,14 +91,18 @@ medusaIntegrationTestRunner({
|
||||
it("should throw an exception if there is no provider for the channel", async () => {
|
||||
const notification = {
|
||||
to: "test@medusajs.com",
|
||||
template: "order-created",
|
||||
channel: "sms",
|
||||
} as CreateNotificationDTO
|
||||
|
||||
const error = await service
|
||||
.createNotifications(notification)
|
||||
.catch((e) => e)
|
||||
|
||||
const [notificationResult] = await service.listNotifications()
|
||||
|
||||
expect(error.message).toEqual(
|
||||
"Could not find a notification provider for channel: sms"
|
||||
`Could not find a notification provider for channel: sms for notification id ${notificationResult.id}`
|
||||
)
|
||||
})
|
||||
|
||||
|
||||
@@ -99,6 +99,10 @@ export interface NotificationDTO {
|
||||
* The date and time the notification was created.
|
||||
*/
|
||||
created_at: Date
|
||||
/**
|
||||
* The status of the notification
|
||||
*/
|
||||
status: "pending" | "success" | "failure"
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
5
packages/core/utils/src/notification/common.ts
Normal file
5
packages/core/utils/src/notification/common.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
export enum NotificationStatus {
|
||||
PENDING = "pending",
|
||||
SUCCESS = "success",
|
||||
FAILURE = "failure",
|
||||
}
|
||||
@@ -1,2 +1,3 @@
|
||||
export * from "./abstract-notification-provider"
|
||||
export * from "./events"
|
||||
export * from "./common"
|
||||
|
||||
@@ -7,6 +7,9 @@ export class NotificationProviderServiceFixtures extends AbstractNotificationPro
|
||||
async send(
|
||||
notification: NotificationTypes.ProviderSendNotificationDTO
|
||||
): Promise<NotificationTypes.ProviderSendNotificationResultsDTO> {
|
||||
if (notification.to === "fail") {
|
||||
throw new Error("Failed to send notification")
|
||||
}
|
||||
return { id: "external_id" }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ import {
|
||||
Module,
|
||||
Modules,
|
||||
NotificationEvents,
|
||||
NotificationStatus,
|
||||
} from "@medusajs/utils"
|
||||
import {
|
||||
MockEventBusService,
|
||||
moduleIntegrationTestRunner,
|
||||
SuiteOptions,
|
||||
} from "medusa-test-utils"
|
||||
import { resolve } from "path"
|
||||
import { NotificationModuleService } from "@services"
|
||||
@@ -32,10 +32,10 @@ let moduleOptions = {
|
||||
|
||||
jest.setTimeout(30000)
|
||||
|
||||
moduleIntegrationTestRunner({
|
||||
moduleIntegrationTestRunner<INotificationModuleService>({
|
||||
moduleName: Modules.NOTIFICATION,
|
||||
moduleOptions,
|
||||
testSuite: ({ service }: SuiteOptions<INotificationModuleService>) =>
|
||||
testSuite: ({ service }) =>
|
||||
describe("Notification Module Service", () => {
|
||||
let eventBusEmitSpy
|
||||
|
||||
@@ -66,7 +66,7 @@ moduleIntegrationTestRunner({
|
||||
})
|
||||
})
|
||||
|
||||
it("sends a notification and stores it in the database", async () => {
|
||||
it("should send a notification and stores it in the database", async () => {
|
||||
const notification = {
|
||||
to: "admin@medusa.com",
|
||||
template: "some-template",
|
||||
@@ -79,11 +79,12 @@ moduleIntegrationTestRunner({
|
||||
expect.objectContaining({
|
||||
provider_id: "test-provider",
|
||||
external_id: "external_id",
|
||||
status: NotificationStatus.SUCCESS,
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
it("emits an event when a notification is created", async () => {
|
||||
it("should emit an event when a notification is created", async () => {
|
||||
const notification = {
|
||||
to: "admin@medusa.com",
|
||||
template: "some-template",
|
||||
@@ -109,7 +110,7 @@ moduleIntegrationTestRunner({
|
||||
)
|
||||
})
|
||||
|
||||
it("ensures the same notification is not sent twice", async () => {
|
||||
it("should ensures the same notification is not sent twice", async () => {
|
||||
const notification = {
|
||||
to: "admin@medusa.com",
|
||||
template: "some-template",
|
||||
@@ -123,11 +124,86 @@ moduleIntegrationTestRunner({
|
||||
expect.objectContaining({
|
||||
provider_id: "test-provider",
|
||||
external_id: "external_id",
|
||||
status: NotificationStatus.SUCCESS,
|
||||
})
|
||||
)
|
||||
|
||||
const secondResult = await service.createNotifications(notification)
|
||||
expect(secondResult).toBe(undefined)
|
||||
})
|
||||
|
||||
it("should manage the status of multiple notification properly in any scenarios", async () => {
|
||||
const notification1 = {
|
||||
to: "admin@medusa.com",
|
||||
template: "some-template",
|
||||
channel: "email",
|
||||
data: {},
|
||||
idempotency_key: "idempotency-key",
|
||||
}
|
||||
|
||||
const notification2 = {
|
||||
to: "0000000000",
|
||||
template: "some-template",
|
||||
channel: "sms",
|
||||
data: {},
|
||||
idempotency_key: "idempotency-key-2",
|
||||
}
|
||||
|
||||
const notification3 = {
|
||||
to: "admin@medusa.com",
|
||||
template: "some-template",
|
||||
channel: "email",
|
||||
data: {},
|
||||
idempotency_key: "idempotency-key-3",
|
||||
}
|
||||
|
||||
const notification4 = {
|
||||
to: "fail",
|
||||
template: "some-template",
|
||||
channel: "email",
|
||||
data: {},
|
||||
idempotency_key: "idempotency-key-4",
|
||||
}
|
||||
|
||||
const err = await service
|
||||
.createNotifications([
|
||||
notification1,
|
||||
notification2,
|
||||
notification3,
|
||||
notification4,
|
||||
])
|
||||
.catch((e) => e)
|
||||
|
||||
const notifications = await service.listNotifications()
|
||||
|
||||
expect(notifications).toHaveLength(4)
|
||||
|
||||
const notification1Result = notifications.find(
|
||||
(n) => n.idempotency_key === "idempotency-key"
|
||||
)!
|
||||
expect(notification1Result.status).toEqual(NotificationStatus.SUCCESS)
|
||||
|
||||
const notification2Result = notifications.find(
|
||||
(n) => n.idempotency_key === "idempotency-key-2"
|
||||
)!
|
||||
expect(notification2Result.status).toEqual(NotificationStatus.FAILURE)
|
||||
|
||||
const notification3Result = notifications.find(
|
||||
(n) => n.idempotency_key === "idempotency-key-3"
|
||||
)!
|
||||
expect(notification3Result.status).toEqual(NotificationStatus.SUCCESS)
|
||||
|
||||
const notification4Result = notifications.find(
|
||||
(n) => n.idempotency_key === "idempotency-key-4"
|
||||
)!
|
||||
expect(notification4Result.status).toEqual(NotificationStatus.FAILURE)
|
||||
|
||||
expect(err).toBeTruthy()
|
||||
expect(err.message).toEqual(
|
||||
`Could not find a notification provider for channel: sms for notification id ${notification2Result.id}
|
||||
Failed to send notification with id ${notification4Result.id}:
|
||||
Failed to send notification`
|
||||
)
|
||||
})
|
||||
}),
|
||||
})
|
||||
|
||||
@@ -212,6 +212,21 @@
|
||||
"nullable": true,
|
||||
"mappedType": "text"
|
||||
},
|
||||
"status": {
|
||||
"name": "status",
|
||||
"type": "text",
|
||||
"unsigned": false,
|
||||
"autoincrement": false,
|
||||
"primary": false,
|
||||
"nullable": false,
|
||||
"default": "'pending'",
|
||||
"enumItems": [
|
||||
"pending",
|
||||
"success",
|
||||
"failure"
|
||||
],
|
||||
"mappedType": "enum"
|
||||
},
|
||||
"provider_id": {
|
||||
"name": "provider_id",
|
||||
"type": "text",
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
import { Migration } from '@mikro-orm/migrations';
|
||||
|
||||
export class Migration20240830094712 extends Migration {
|
||||
|
||||
async up(): Promise<void> {
|
||||
this.addSql('alter table if exists "notification" add column if not exists "status" text check ("status" in (\'pending\', \'success\', \'failure\')) not null default \'pending\';');
|
||||
}
|
||||
|
||||
async down(): Promise<void> {
|
||||
this.addSql('alter table if exists "notification" drop column if exists "status";');
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import { model } from "@medusajs/utils"
|
||||
import { model, NotificationStatus } from "@medusajs/utils"
|
||||
import { NotificationProvider } from "./notification-provider"
|
||||
|
||||
// We probably want to have a TTL for each entry, so we don't bloat the DB (and also for GDPR reasons if TTL < 30 days).
|
||||
@@ -24,6 +24,9 @@ export const Notification = model.define("notification", {
|
||||
idempotency_key: model.text().unique().nullable(),
|
||||
// The ID of the notification in the external system, if applicable
|
||||
external_id: model.text().nullable(),
|
||||
// The status of the notification
|
||||
status: model.enum(NotificationStatus).default(NotificationStatus.PENDING),
|
||||
|
||||
provider: model
|
||||
.belongsTo(() => NotificationProvider, { mappedBy: "notifications" })
|
||||
.nullable(),
|
||||
|
||||
@@ -9,11 +9,12 @@ import {
|
||||
} from "@medusajs/types"
|
||||
import {
|
||||
EmitEvents,
|
||||
generateEntityId,
|
||||
InjectManager,
|
||||
InjectTransactionManager,
|
||||
MedusaContext,
|
||||
MedusaError,
|
||||
MedusaService,
|
||||
NotificationStatus,
|
||||
promiseAll,
|
||||
} from "@medusajs/utils"
|
||||
import { Notification } from "@models"
|
||||
@@ -94,7 +95,7 @@ export default class NotificationModuleService
|
||||
return Array.isArray(data) ? serialized : serialized[0]
|
||||
}
|
||||
|
||||
@InjectTransactionManager("baseRepository_")
|
||||
@InjectManager("baseRepository_")
|
||||
protected async createNotifications_(
|
||||
data: NotificationTypes.CreateNotificationDTO[],
|
||||
@MedusaContext() sharedContext: Context = {}
|
||||
@@ -108,57 +109,128 @@ export default class NotificationModuleService
|
||||
const idempotencyKeys = data
|
||||
.map((entry) => entry.idempotency_key)
|
||||
.filter(Boolean)
|
||||
const alreadySentNotifications = await this.notificationService_.list(
|
||||
{
|
||||
idempotency_key: idempotencyKeys,
|
||||
},
|
||||
{ take: null },
|
||||
sharedContext
|
||||
)
|
||||
|
||||
const existsMap = new Map(
|
||||
alreadySentNotifications.map((n) => [n.idempotency_key as string, true])
|
||||
)
|
||||
|
||||
const notificationsToProcess = data.filter(
|
||||
(entry) => !entry.idempotency_key || !existsMap.has(entry.idempotency_key)
|
||||
)
|
||||
|
||||
const notificationsToCreate = await promiseAll(
|
||||
notificationsToProcess.map(async (entry) => {
|
||||
const provider =
|
||||
await this.notificationProviderService_.getProviderForChannel(
|
||||
entry.channel
|
||||
)
|
||||
|
||||
if (!provider) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Could not find a notification provider for channel: ${entry.channel}`
|
||||
)
|
||||
let { notificationsToProcess, createdNotifications } =
|
||||
await this.baseRepository_.transaction(async (txManager) => {
|
||||
const context = {
|
||||
...sharedContext,
|
||||
transactionManager: txManager,
|
||||
}
|
||||
|
||||
if (!provider.is_enabled) {
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Notification provider ${provider.id} is not enabled. To enable it, configure it as a provider in the notification module options.`
|
||||
)
|
||||
}
|
||||
|
||||
const res = await this.notificationProviderService_.send(
|
||||
provider,
|
||||
entry
|
||||
const alreadySentNotifications = await this.notificationService_.list(
|
||||
{
|
||||
idempotency_key: idempotencyKeys,
|
||||
},
|
||||
{ take: null },
|
||||
context
|
||||
)
|
||||
return { ...entry, provider_id: provider.id, external_id: res.id }
|
||||
})
|
||||
)
|
||||
|
||||
// Currently we store notifications after they are sent, which might result in a notification being sent that is not registered in the database.
|
||||
// If necessary, we can switch to a two-step process where we first create the notification, send it, and update it after it being sent.
|
||||
const createdNotifications = await this.notificationService_.create(
|
||||
notificationsToCreate,
|
||||
sharedContext
|
||||
)
|
||||
const existsMap = new Map(
|
||||
alreadySentNotifications.map((n) => [n.idempotency_key as string, n])
|
||||
)
|
||||
|
||||
const notificationsToProcess = data.filter(
|
||||
(entry) =>
|
||||
!entry.idempotency_key ||
|
||||
!existsMap.has(entry.idempotency_key) ||
|
||||
(existsMap.has(entry.idempotency_key) &&
|
||||
existsMap.get(entry.idempotency_key)!.status ===
|
||||
NotificationStatus.FAILURE)
|
||||
)
|
||||
|
||||
const channels = notificationsToProcess.map((not) => not.channel)
|
||||
const providers =
|
||||
await this.notificationProviderService_.getProviderForChannels(
|
||||
channels
|
||||
)
|
||||
|
||||
// Create the notifications to be sent to prevent concurrent actions listing the same notifications
|
||||
const normalizedNotificationsToProcess = notificationsToProcess.map(
|
||||
(entry) => {
|
||||
const provider = providers.find((provider) =>
|
||||
provider?.channels.includes(entry.channel)
|
||||
)
|
||||
|
||||
return {
|
||||
provider,
|
||||
data: {
|
||||
id: generateEntityId(undefined, "noti"),
|
||||
...entry,
|
||||
provider_id: provider?.id,
|
||||
},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
const toCreate = normalizedNotificationsToProcess
|
||||
.filter(
|
||||
(e) =>
|
||||
!e.data.idempotency_key || !existsMap.has(e.data.idempotency_key)
|
||||
)
|
||||
.map((e) => e.data)
|
||||
|
||||
const createdNotifications = toCreate.length
|
||||
? await this.notificationService_.create(toCreate, context)
|
||||
: []
|
||||
|
||||
return {
|
||||
notificationsToProcess: normalizedNotificationsToProcess,
|
||||
createdNotifications,
|
||||
}
|
||||
})
|
||||
|
||||
const notificationToUpdate: { id: string; external_id?: string }[] = []
|
||||
|
||||
try {
|
||||
await promiseAll(
|
||||
notificationsToProcess.map(async (entry) => {
|
||||
const provider = entry.provider
|
||||
|
||||
if (!provider?.is_enabled) {
|
||||
entry.data.status = NotificationStatus.FAILURE
|
||||
notificationToUpdate.push(entry.data)
|
||||
|
||||
const errorMessage = !provider
|
||||
? `Could not find a notification provider for channel: ${entry.data.channel} for notification id ${entry.data.id}`
|
||||
: `Notification provider ${provider.id} is not enabled. To enable it, configure it as a provider in the notification module options.`
|
||||
|
||||
throw new MedusaError(MedusaError.Types.NOT_FOUND, errorMessage)
|
||||
}
|
||||
|
||||
const res = await this.notificationProviderService_
|
||||
.send(provider, entry.data)
|
||||
.catch((e) => {
|
||||
entry.data.status = NotificationStatus.FAILURE
|
||||
notificationToUpdate.push(entry.data)
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.UNEXPECTED_STATE,
|
||||
`Failed to send notification with id ${entry.data.id}:\n${e.message}`
|
||||
)
|
||||
})
|
||||
|
||||
entry.data.external_id = res.id
|
||||
entry.data.status = NotificationStatus.SUCCESS
|
||||
|
||||
notificationToUpdate.push(entry.data)
|
||||
}),
|
||||
{
|
||||
aggregateErrors: true,
|
||||
}
|
||||
)
|
||||
} finally {
|
||||
const updatedNotifications = await this.notificationService_.update(
|
||||
notificationToUpdate,
|
||||
sharedContext
|
||||
)
|
||||
const updatedNotificationsMap = new Map(
|
||||
updatedNotifications.map((n) => [n.id, n])
|
||||
)
|
||||
|
||||
// Maintain the order of the notifications
|
||||
createdNotifications = createdNotifications.map((notification) => {
|
||||
return updatedNotificationsMap.get(notification.id) || notification
|
||||
})
|
||||
}
|
||||
|
||||
return createdNotifications
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ type InjectedDependencies = {
|
||||
]: NotificationTypes.INotificationProvider
|
||||
}
|
||||
|
||||
type Provider = InferEntityType<typeof NotificationProvider>
|
||||
|
||||
export default class NotificationProviderService extends ModulesSdkUtils.MedusaInternalService<
|
||||
InjectedDependencies,
|
||||
typeof NotificationProvider
|
||||
@@ -46,13 +48,13 @@ export default class NotificationProviderService extends ModulesSdkUtils.MedusaI
|
||||
}
|
||||
}
|
||||
|
||||
async getProviderForChannel(
|
||||
channel: string
|
||||
): Promise<InferEntityType<typeof NotificationProvider> | undefined> {
|
||||
async getProviderForChannels<
|
||||
TChannel = string | string[],
|
||||
TOutput = TChannel extends string[] ? Provider[] : Provider | undefined
|
||||
>(channels: TChannel): Promise<TOutput> {
|
||||
if (!this.providersCache) {
|
||||
const providers = await this.notificationProviderRepository_.find()
|
||||
|
||||
type name = (typeof NotificationProvider)["name"]
|
||||
this.providersCache = new Map(
|
||||
providers.flatMap((provider) =>
|
||||
provider.channels.map((c) => [c, provider])
|
||||
@@ -60,7 +62,12 @@ export default class NotificationProviderService extends ModulesSdkUtils.MedusaI
|
||||
)
|
||||
}
|
||||
|
||||
return this.providersCache.get(channel)
|
||||
const normalizedChannels = Array.isArray(channels) ? channels : [channels]
|
||||
const results = normalizedChannels
|
||||
.map((channel) => this.providersCache.get(channel))
|
||||
.filter(Boolean)
|
||||
|
||||
return (Array.isArray(channels) ? results : results[0]) as TOutput
|
||||
}
|
||||
|
||||
async send(
|
||||
|
||||
Reference in New Issue
Block a user