diff --git a/.changeset/great-months-train.md b/.changeset/great-months-train.md new file mode 100644 index 0000000000..84018fd5b6 --- /dev/null +++ b/.changeset/great-months-train.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +fix(medusa): Make event enqueuer reconnect the database when it lost the connection diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index 8adc762be1..29cacfec78 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -1,5 +1,5 @@ import { EventBusTypes } from "@medusajs/types" -import { EventBusUtils } from "@medusajs/utils" +import { DatabaseErrorCode, EventBusUtils } from "@medusajs/utils" import { EntityManager } from "typeorm" import { TransactionBaseService } from "../interfaces" import { StagedJob } from "../models" @@ -190,26 +190,35 @@ export default class EventBusService } while (this.shouldEnqueuerRun) { - const jobs = await this.stagedJobService_.list(listConfig) + try { + const jobs = await this.stagedJobService_.list(listConfig) - if (!jobs.length) { - await sleep(3000) - continue - } - - const eventsData = jobs.map((job) => { - return { - eventName: job.event_name, - data: job.data, - options: { jobId: job.id, ...job.options }, + if (!jobs.length) { + await sleep(3000) + continue } - }) - await this.eventBusModuleService_.emit(eventsData).then(async () => { - return await this.stagedJobService_.delete(jobs.map((j) => j.id)) - }) + const eventsData = jobs.map((job) => { + return { + eventName: job.event_name, + data: job.data, + options: { jobId: job.id, ...job.options }, + } + }) - await sleep(3000) + await this.eventBusModuleService_.emit(eventsData).then(async () => { + return await this.stagedJobService_.delete(jobs.map((j) => j.id)) + }) + + await sleep(3000) + } catch (err) { + if (DatabaseErrorCode.connectionFailure === err.code) { + await sleep(3000) + continue + } + + throw err + } } } }