From 4d439bed3812e4d4c45bd5c0a872c9b0fd111af8 Mon Sep 17 00:00:00 2001 From: Micky Jittjana Date: Tue, 29 Aug 2023 18:03:11 +0700 Subject: [PATCH] fix(medusa): Make event enqueuer reconnect the database when it lost the connection (#4855) * Try to periodically reconnect database from enqueuer * Create great-months-train.md --------- Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com> Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com> --- .changeset/great-months-train.md | 5 +++ packages/medusa/src/services/event-bus.ts | 43 ++++++++++++++--------- 2 files changed, 31 insertions(+), 17 deletions(-) create mode 100644 .changeset/great-months-train.md diff --git a/.changeset/great-months-train.md b/.changeset/great-months-train.md new file mode 100644 index 0000000000..84018fd5b6 --- /dev/null +++ b/.changeset/great-months-train.md @@ -0,0 +1,5 @@ +--- +"@medusajs/medusa": patch +--- + +fix(medusa): Make event enqueuer reconnect the database when it lost the connection diff --git a/packages/medusa/src/services/event-bus.ts b/packages/medusa/src/services/event-bus.ts index 8adc762be1..29cacfec78 100644 --- a/packages/medusa/src/services/event-bus.ts +++ b/packages/medusa/src/services/event-bus.ts @@ -1,5 +1,5 @@ import { EventBusTypes } from "@medusajs/types" -import { EventBusUtils } from "@medusajs/utils" +import { DatabaseErrorCode, EventBusUtils } from "@medusajs/utils" import { EntityManager } from "typeorm" import { TransactionBaseService } from "../interfaces" import { StagedJob } from "../models" @@ -190,26 +190,35 @@ export default class EventBusService } while (this.shouldEnqueuerRun) { - const jobs = await this.stagedJobService_.list(listConfig) + try { + const jobs = await this.stagedJobService_.list(listConfig) - if (!jobs.length) { - await sleep(3000) - continue - } - - const eventsData = jobs.map((job) => { - return { - eventName: job.event_name, - data: job.data, - options: { jobId: job.id, ...job.options }, + if (!jobs.length) { + await sleep(3000) + continue } - }) - await this.eventBusModuleService_.emit(eventsData).then(async () => { - return await this.stagedJobService_.delete(jobs.map((j) => j.id)) - }) + const eventsData = jobs.map((job) => { + return { + eventName: job.event_name, + data: job.data, + options: { jobId: job.id, ...job.options }, + } + }) - await sleep(3000) + await this.eventBusModuleService_.emit(eventsData).then(async () => { + return await this.stagedJobService_.delete(jobs.map((j) => j.id)) + }) + + await sleep(3000) + } catch (err) { + if (DatabaseErrorCode.connectionFailure === err.code) { + await sleep(3000) + continue + } + + throw err + } } } }