fix(medusa): Make event enqueuer reconnect the database when it lost the connection (#4855)
* Try to periodically reconnect database from enqueuer * Create great-months-train.md --------- Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com> Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
5
.changeset/great-months-train.md
Normal file
5
.changeset/great-months-train.md
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
"@medusajs/medusa": patch
|
||||
---
|
||||
|
||||
fix(medusa): Make event enqueuer reconnect the database when it lost the connection
|
||||
@@ -1,5 +1,5 @@
|
||||
import { EventBusTypes } from "@medusajs/types"
|
||||
import { EventBusUtils } from "@medusajs/utils"
|
||||
import { DatabaseErrorCode, EventBusUtils } from "@medusajs/utils"
|
||||
import { EntityManager } from "typeorm"
|
||||
import { TransactionBaseService } from "../interfaces"
|
||||
import { StagedJob } from "../models"
|
||||
@@ -190,26 +190,35 @@ export default class EventBusService
|
||||
}
|
||||
|
||||
while (this.shouldEnqueuerRun) {
|
||||
const jobs = await this.stagedJobService_.list(listConfig)
|
||||
try {
|
||||
const jobs = await this.stagedJobService_.list(listConfig)
|
||||
|
||||
if (!jobs.length) {
|
||||
await sleep(3000)
|
||||
continue
|
||||
}
|
||||
|
||||
const eventsData = jobs.map((job) => {
|
||||
return {
|
||||
eventName: job.event_name,
|
||||
data: job.data,
|
||||
options: { jobId: job.id, ...job.options },
|
||||
if (!jobs.length) {
|
||||
await sleep(3000)
|
||||
continue
|
||||
}
|
||||
})
|
||||
|
||||
await this.eventBusModuleService_.emit(eventsData).then(async () => {
|
||||
return await this.stagedJobService_.delete(jobs.map((j) => j.id))
|
||||
})
|
||||
const eventsData = jobs.map((job) => {
|
||||
return {
|
||||
eventName: job.event_name,
|
||||
data: job.data,
|
||||
options: { jobId: job.id, ...job.options },
|
||||
}
|
||||
})
|
||||
|
||||
await sleep(3000)
|
||||
await this.eventBusModuleService_.emit(eventsData).then(async () => {
|
||||
return await this.stagedJobService_.delete(jobs.map((j) => j.id))
|
||||
})
|
||||
|
||||
await sleep(3000)
|
||||
} catch (err) {
|
||||
if (DatabaseErrorCode.connectionFailure === err.code) {
|
||||
await sleep(3000)
|
||||
continue
|
||||
}
|
||||
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user