chore(event-bus): event bus error handling (#10085)
This commit is contained in:
committed by
GitHub
parent
98bf8c9006
commit
3e265229f2
8
.changeset/orange-donkeys-hammer.md
Normal file
8
.changeset/orange-donkeys-hammer.md
Normal file
@@ -0,0 +1,8 @@
|
||||
---
|
||||
"@medusajs/event-bus-local": patch
|
||||
"@medusajs/event-bus-redis": patch
|
||||
"@medusajs/orchestration": patch
|
||||
"@medusajs/utils": patch
|
||||
---
|
||||
|
||||
Improve event bus error handling
|
||||
@@ -58,30 +58,3 @@ export class TransactionTimeoutError extends Error {
|
||||
this.name = "TransactionTimeoutError"
|
||||
}
|
||||
}
|
||||
|
||||
export function serializeError(error) {
|
||||
const serialized = {
|
||||
message: error.message,
|
||||
name: error.name,
|
||||
stack: error.stack,
|
||||
}
|
||||
|
||||
Object.getOwnPropertyNames(error).forEach((key) => {
|
||||
// eslint-disable-next-line no-prototype-builtins
|
||||
if (!serialized.hasOwnProperty(key)) {
|
||||
serialized[key] = error[key]
|
||||
}
|
||||
})
|
||||
|
||||
return serialized
|
||||
}
|
||||
|
||||
export function isErrorLike(value) {
|
||||
return (
|
||||
!!value &&
|
||||
typeof value === "object" &&
|
||||
"name" in value &&
|
||||
"message" in value &&
|
||||
"stack" in value
|
||||
)
|
||||
}
|
||||
|
||||
@@ -17,12 +17,16 @@ import {
|
||||
TransactionStepStatus,
|
||||
} from "./types"
|
||||
|
||||
import { MedusaError, promiseAll, TransactionStepState } from "@medusajs/utils"
|
||||
import { EventEmitter } from "events"
|
||||
import {
|
||||
isErrorLike,
|
||||
PermanentStepFailureError,
|
||||
MedusaError,
|
||||
promiseAll,
|
||||
serializeError,
|
||||
TransactionStepState,
|
||||
} from "@medusajs/utils"
|
||||
import { EventEmitter } from "events"
|
||||
import {
|
||||
PermanentStepFailureError,
|
||||
SkipStepResponse,
|
||||
TransactionStepTimeoutError,
|
||||
TransactionTimeoutError,
|
||||
|
||||
@@ -18,6 +18,7 @@ export * from "./dynamic-import"
|
||||
export * from "./env-editor"
|
||||
export * from "./errors"
|
||||
export * from "./file-system"
|
||||
export * from "./filter-operator-map"
|
||||
export * from "./generate-entity-id"
|
||||
export * from "./get-caller-file-path"
|
||||
export * from "./get-config-file"
|
||||
@@ -34,6 +35,7 @@ export * from "./is-boolean"
|
||||
export * from "./is-date"
|
||||
export * from "./is-defined"
|
||||
export * from "./is-email"
|
||||
export * from "./is-error-like"
|
||||
export * from "./is-object"
|
||||
export * from "./is-present"
|
||||
export * from "./is-string"
|
||||
@@ -62,6 +64,7 @@ export * from "./remove-undefined-properties"
|
||||
export * from "./resolve-exports"
|
||||
export * from "./rules"
|
||||
export * from "./selector-constraints-to-string"
|
||||
export * from "./serialize-error"
|
||||
export * from "./set-metadata"
|
||||
export * from "./simple-hash"
|
||||
export * from "./string-to-select-relation-object"
|
||||
@@ -74,4 +77,3 @@ export * from "./trim-zeros"
|
||||
export * from "./upper-case-first"
|
||||
export * from "./validate-handle"
|
||||
export * from "./wrap-handler"
|
||||
export * from "./filter-operator-map"
|
||||
|
||||
9
packages/core/utils/src/common/is-error-like.ts
Normal file
9
packages/core/utils/src/common/is-error-like.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
export function isErrorLike(value) {
|
||||
return (
|
||||
!!value &&
|
||||
typeof value === "object" &&
|
||||
"name" in value &&
|
||||
"message" in value &&
|
||||
"stack" in value
|
||||
)
|
||||
}
|
||||
16
packages/core/utils/src/common/serialize-error.ts
Normal file
16
packages/core/utils/src/common/serialize-error.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
export function serializeError(error) {
|
||||
const serialized = {
|
||||
message: error.message,
|
||||
name: error.name,
|
||||
stack: error.stack,
|
||||
}
|
||||
|
||||
Object.getOwnPropertyNames(error).forEach((key) => {
|
||||
// eslint-disable-next-line no-prototype-builtins
|
||||
if (!serialized.hasOwnProperty(key)) {
|
||||
serialized[key] = error[key]
|
||||
}
|
||||
})
|
||||
|
||||
return serialized
|
||||
}
|
||||
@@ -143,10 +143,11 @@ export default class LocalEventBusService extends AbstractEventBusModuleService
|
||||
this.eventEmitter_.on(event, async (data: Event) => {
|
||||
try {
|
||||
await subscriber(data)
|
||||
} catch (e) {
|
||||
} catch (err) {
|
||||
this.logger_?.error(
|
||||
`An error occurred while processing ${event.toString()}: ${e}`
|
||||
`An error occurred while processing ${event.toString()}:`
|
||||
)
|
||||
this.logger_?.error(err)
|
||||
}
|
||||
})
|
||||
return this
|
||||
|
||||
@@ -16,9 +16,9 @@ jest.mock("bullmq")
|
||||
jest.mock("ioredis")
|
||||
|
||||
const loggerMock = {
|
||||
info: jest.fn().mockReturnValue(console.log),
|
||||
warn: jest.fn().mockReturnValue(console.log),
|
||||
error: jest.fn().mockReturnValue(console.log),
|
||||
info: jest.fn().mockImplementation(console.log),
|
||||
warn: jest.fn().mockImplementation(console.warn),
|
||||
error: jest.fn().mockImplementation(console.error),
|
||||
} as unknown as Logger
|
||||
|
||||
const redisMock = {
|
||||
@@ -376,7 +376,7 @@ describe("RedisEventBusService", () => {
|
||||
})
|
||||
eventBus.subscribe("eventName", () => {
|
||||
test.push("fail1")
|
||||
return Promise.reject("fail1")
|
||||
throw new Error("fail1")
|
||||
})
|
||||
eventBus.subscribe("eventName", () => {
|
||||
test.push("hi2")
|
||||
@@ -399,15 +399,21 @@ describe("RedisEventBusService", () => {
|
||||
"Processing eventName which has 4 subscribers"
|
||||
)
|
||||
|
||||
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith(
|
||||
"An error occurred while processing eventName: fail1"
|
||||
)
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith(
|
||||
"An error occurred while processing eventName: fail2"
|
||||
expect(loggerMock.warn).toHaveBeenCalledTimes(5)
|
||||
expect(loggerMock.warn).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
"An error occurred while processing eventName:"
|
||||
)
|
||||
expect(loggerMock.warn).toHaveBeenNthCalledWith(2, new Error("fail1"))
|
||||
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith(
|
||||
expect(loggerMock.warn).toHaveBeenNthCalledWith(
|
||||
3,
|
||||
"An error occurred while processing eventName:"
|
||||
)
|
||||
expect(loggerMock.warn).toHaveBeenNthCalledWith(4, "fail2")
|
||||
|
||||
expect(loggerMock.warn).toHaveBeenNthCalledWith(
|
||||
5,
|
||||
"One or more subscribers of eventName failed. Retrying is not configured. Use 'attempts' option when emitting events."
|
||||
)
|
||||
|
||||
@@ -439,10 +445,11 @@ describe("RedisEventBusService", () => {
|
||||
} as any)
|
||||
.catch((error) => void 0)
|
||||
|
||||
expect(loggerMock.warn).toHaveBeenCalledTimes(1)
|
||||
expect(loggerMock.warn).toHaveBeenCalledTimes(2)
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith(
|
||||
"An error occurred while processing eventName: fail1"
|
||||
"An error occurred while processing eventName:"
|
||||
)
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith("fail1")
|
||||
|
||||
expect(loggerMock.info).toHaveBeenCalledTimes(2)
|
||||
expect(loggerMock.info).toHaveBeenCalledWith(
|
||||
@@ -478,10 +485,12 @@ describe("RedisEventBusService", () => {
|
||||
} as any)
|
||||
.catch((err) => void 0)
|
||||
|
||||
expect(loggerMock.warn).toHaveBeenCalledTimes(2)
|
||||
expect(loggerMock.warn).toHaveBeenCalledTimes(3)
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith(
|
||||
"An error occurred while processing eventName: fail1"
|
||||
"An error occurred while processing eventName:"
|
||||
)
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith("fail1")
|
||||
|
||||
expect(loggerMock.warn).toHaveBeenCalledWith(
|
||||
"One or more subscribers of eventName failed. Retrying..."
|
||||
)
|
||||
|
||||
@@ -276,18 +276,18 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
|
||||
metadata: data.metadata,
|
||||
}
|
||||
|
||||
return await subscriber(event)
|
||||
.then(async (data) => {
|
||||
try {
|
||||
return await subscriber(event).then((data) => {
|
||||
// For every subscriber that completes successfully, add their id to the list of completed subscribers
|
||||
completedSubscribersInCurrentAttempt.push(id)
|
||||
return data
|
||||
})
|
||||
.catch((err) => {
|
||||
this.logger_.warn(
|
||||
`An error occurred while processing ${name}: ${err}`
|
||||
)
|
||||
return err
|
||||
})
|
||||
} catch (err) {
|
||||
this.logger_?.warn(`An error occurred while processing ${name}:`)
|
||||
this.logger_?.warn(err)
|
||||
|
||||
return err
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user