fix: Remove scheduled workflows if they no longer exist when executed (#7673)
This commit is contained in:
@@ -5,6 +5,7 @@ import {
|
||||
isString,
|
||||
MedusaContext,
|
||||
MedusaContextType,
|
||||
MedusaError,
|
||||
MedusaModuleType,
|
||||
} from "@medusajs/utils"
|
||||
import { asValue } from "awilix"
|
||||
@@ -51,7 +52,10 @@ export class LocalWorkflow {
|
||||
) {
|
||||
const globalWorkflow = WorkflowManager.getWorkflow(workflowId)
|
||||
if (!globalWorkflow) {
|
||||
throw new Error(`Workflow with id "${workflowId}" not found.`)
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Workflow with id "${workflowId}" not found.`
|
||||
)
|
||||
}
|
||||
|
||||
this.flow = new OrchestratorBuilder(globalWorkflow.flow_)
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
TransactionStepsDefinition,
|
||||
} from "../transaction"
|
||||
import { WorkflowScheduler } from "./scheduler"
|
||||
import { MedusaError } from "@medusajs/utils"
|
||||
|
||||
export interface WorkflowDefinition {
|
||||
id: string
|
||||
@@ -78,7 +79,10 @@ export class WorkflowManager {
|
||||
|
||||
static getTransactionDefinition(workflowId): OrchestratorBuilder {
|
||||
if (!WorkflowManager.workflows.has(workflowId)) {
|
||||
throw new Error(`Workflow with id "${workflowId}" not found.`)
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Workflow with id "${workflowId}" not found.`
|
||||
)
|
||||
}
|
||||
|
||||
const workflow = WorkflowManager.workflows.get(workflowId)!
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
import { MedusaApp } from "@medusajs/modules-sdk"
|
||||
import { RemoteJoinerQuery } from "@medusajs/types"
|
||||
import { TransactionHandlerType } from "@medusajs/utils"
|
||||
import { IWorkflowEngineService } from "@medusajs/workflows-sdk"
|
||||
import { IWorkflowEngineService, MedusaWorkflow } from "@medusajs/workflows-sdk"
|
||||
import { knex } from "knex"
|
||||
import { setTimeout as setTimeoutPromise } from "timers/promises"
|
||||
import "../__fixtures__"
|
||||
import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__"
|
||||
import { createScheduled } from "../__fixtures__/workflow_scheduled"
|
||||
import { DB_URL, TestDatabase } from "../utils"
|
||||
import { WorkflowManager } from "@medusajs/orchestration"
|
||||
|
||||
const sharedPgConnection = knex<any, any>({
|
||||
client: "pg",
|
||||
@@ -239,5 +240,23 @@ describe("Workflow Orchestrator module", function () {
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(spy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it("should remove scheduled workflow if workflow no longer exists", async () => {
|
||||
const spy = await createScheduled("remove-scheduled", {
|
||||
cron: "* * * * * *",
|
||||
})
|
||||
const logSpy = jest.spyOn(console, "warn")
|
||||
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(spy).toHaveBeenCalledTimes(1)
|
||||
|
||||
WorkflowManager["workflows"].delete("remove-scheduled")
|
||||
|
||||
await jest.runOnlyPendingTimersAsync()
|
||||
expect(spy).toHaveBeenCalledTimes(1)
|
||||
expect(logSpy).toHaveBeenCalledWith(
|
||||
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -6,7 +6,12 @@ import {
|
||||
WorkflowScheduler,
|
||||
} from "@medusajs/orchestration"
|
||||
import { ContainerLike, Context, MedusaContainer } from "@medusajs/types"
|
||||
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
|
||||
import {
|
||||
InjectSharedContext,
|
||||
MedusaContext,
|
||||
MedusaError,
|
||||
isString,
|
||||
} from "@medusajs/utils"
|
||||
import {
|
||||
MedusaWorkflow,
|
||||
ReturnWorkflow,
|
||||
@@ -108,7 +113,10 @@ export class WorkflowOrchestratorService {
|
||||
: workflowIdOrWorkflow.getName()
|
||||
|
||||
if (!workflowId) {
|
||||
throw new Error("Workflow ID is required")
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Workflow ID is required`
|
||||
)
|
||||
}
|
||||
|
||||
context ??= {}
|
||||
@@ -122,7 +130,10 @@ export class WorkflowOrchestratorService {
|
||||
|
||||
const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId)
|
||||
if (!exportedWorkflow) {
|
||||
throw new Error(`Workflow with id "${workflowId}" not found.`)
|
||||
throw new MedusaError(
|
||||
MedusaError.Types.NOT_FOUND,
|
||||
`Workflow with id "${workflowId}" not found.`
|
||||
)
|
||||
}
|
||||
|
||||
const flow = exportedWorkflow(container as MedusaContainer)
|
||||
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
TransactionStep,
|
||||
} from "@medusajs/orchestration"
|
||||
import { ModulesSdkTypes } from "@medusajs/types"
|
||||
import { TransactionState } from "@medusajs/utils"
|
||||
import { MedusaError, TransactionState } from "@medusajs/utils"
|
||||
import { WorkflowOrchestratorService } from "@services"
|
||||
import { CronExpression, parseExpression } from "cron-parser"
|
||||
|
||||
@@ -266,9 +266,9 @@ export class InMemoryDistributedTransactionStorage
|
||||
}
|
||||
|
||||
async removeAll(): Promise<void> {
|
||||
this.scheduled.forEach((_, key) => {
|
||||
this.remove(key)
|
||||
})
|
||||
for (const [key, job] of this.scheduled) {
|
||||
await this.remove(key)
|
||||
}
|
||||
}
|
||||
|
||||
async jobHandler(jobId: string) {
|
||||
@@ -297,9 +297,22 @@ export class InMemoryDistributedTransactionStorage
|
||||
config: job.config,
|
||||
})
|
||||
|
||||
// With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported.
|
||||
await this.workflowOrchestratorService_.run(jobId, {
|
||||
throwOnError: false,
|
||||
})
|
||||
try {
|
||||
// With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported.
|
||||
await this.workflowOrchestratorService_.run(jobId, {
|
||||
throwOnError: false,
|
||||
})
|
||||
} catch (e) {
|
||||
if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) {
|
||||
console.warn(
|
||||
`Tried to execute a scheduled workflow with ID ${jobId} that does not exist, removing it from the scheduler.`
|
||||
)
|
||||
|
||||
await this.remove(jobId)
|
||||
return
|
||||
}
|
||||
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import { MedusaApp } from "@medusajs/modules-sdk"
|
||||
import {
|
||||
TransactionStepTimeoutError,
|
||||
TransactionTimeoutError,
|
||||
WorkflowManager,
|
||||
} from "@medusajs/orchestration"
|
||||
import { RemoteQueryFunction } from "@medusajs/types"
|
||||
import { TransactionHandlerType, TransactionStepState } from "@medusajs/utils"
|
||||
@@ -316,5 +317,23 @@ describe("Workflow Orchestrator module", function () {
|
||||
await setTimeout(3100)
|
||||
expect(spy).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it("should remove scheduled workflow if workflow no longer exists", async () => {
|
||||
const spy = await createScheduled("remove-scheduled", {
|
||||
cron: "* * * * * *",
|
||||
})
|
||||
const logSpy = jest.spyOn(console, "warn")
|
||||
|
||||
await setTimeout(1100)
|
||||
expect(spy).toHaveBeenCalledTimes(1)
|
||||
|
||||
WorkflowManager["workflows"].delete("remove-scheduled")
|
||||
|
||||
await setTimeout(1100)
|
||||
expect(spy).toHaveBeenCalledTimes(1)
|
||||
expect(logSpy).toHaveBeenCalledWith(
|
||||
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -7,7 +7,7 @@ import {
|
||||
TransactionStep,
|
||||
} from "@medusajs/orchestration"
|
||||
import { ModulesSdkTypes } from "@medusajs/types"
|
||||
import { TransactionState, promiseAll } from "@medusajs/utils"
|
||||
import { MedusaError, TransactionState, promiseAll } from "@medusajs/utils"
|
||||
import { WorkflowOrchestratorService } from "@services"
|
||||
import { Queue, Worker } from "bullmq"
|
||||
import Redis from "ioredis"
|
||||
@@ -123,11 +123,24 @@ export class RedisDistributedTransactionStorage
|
||||
jobId: string,
|
||||
schedulerOptions: SchedulerOptions
|
||||
) {
|
||||
// TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency
|
||||
// of the transaction to ensure that the transaction is only executed once.
|
||||
return await this.workflowOrchestratorService_.run(jobId, {
|
||||
throwOnError: false,
|
||||
})
|
||||
try {
|
||||
// TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency
|
||||
// of the transaction to ensure that the transaction is only executed once.
|
||||
return await this.workflowOrchestratorService_.run(jobId, {
|
||||
throwOnError: false,
|
||||
})
|
||||
} catch (e) {
|
||||
if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) {
|
||||
console.warn(
|
||||
`Tried to execute a scheduled workflow with ID ${jobId} that does not exist, removing it from the scheduler.`
|
||||
)
|
||||
|
||||
await this.remove(jobId)
|
||||
return
|
||||
}
|
||||
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
async get(key: string): Promise<TransactionCheckpoint | undefined> {
|
||||
|
||||
Reference in New Issue
Block a user