From 3d72002c28db1284819bdcffcac36955fe625627 Mon Sep 17 00:00:00 2001 From: Stevche Radevski Date: Tue, 11 Jun 2024 12:57:25 +0200 Subject: [PATCH] fix: Remove scheduled workflows if they no longer exist when executed (#7673) --- .../src/workflow/local-workflow.ts | 6 +++- .../src/workflow/workflow-manager.ts | 6 +++- .../integration-tests/__tests__/index.spec.ts | 21 +++++++++++++- .../src/services/workflow-orchestrator.ts | 17 +++++++++-- .../utils/workflow-orchestrator-storage.ts | 29 ++++++++++++++----- .../integration-tests/__tests__/index.spec.ts | 19 ++++++++++++ .../utils/workflow-orchestrator-storage.ts | 25 ++++++++++++---- 7 files changed, 103 insertions(+), 20 deletions(-) diff --git a/packages/core/orchestration/src/workflow/local-workflow.ts b/packages/core/orchestration/src/workflow/local-workflow.ts index 7956841be4..dd4ea711fb 100644 --- a/packages/core/orchestration/src/workflow/local-workflow.ts +++ b/packages/core/orchestration/src/workflow/local-workflow.ts @@ -5,6 +5,7 @@ import { isString, MedusaContext, MedusaContextType, + MedusaError, MedusaModuleType, } from "@medusajs/utils" import { asValue } from "awilix" @@ -51,7 +52,10 @@ export class LocalWorkflow { ) { const globalWorkflow = WorkflowManager.getWorkflow(workflowId) if (!globalWorkflow) { - throw new Error(`Workflow with id "${workflowId}" not found.`) + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `Workflow with id "${workflowId}" not found.` + ) } this.flow = new OrchestratorBuilder(globalWorkflow.flow_) diff --git a/packages/core/orchestration/src/workflow/workflow-manager.ts b/packages/core/orchestration/src/workflow/workflow-manager.ts index 3bbb8ca295..50bda049e6 100644 --- a/packages/core/orchestration/src/workflow/workflow-manager.ts +++ b/packages/core/orchestration/src/workflow/workflow-manager.ts @@ -11,6 +11,7 @@ import { TransactionStepsDefinition, } from "../transaction" import { WorkflowScheduler } from "./scheduler" +import { MedusaError } from "@medusajs/utils" export interface WorkflowDefinition { id: string @@ -78,7 +79,10 @@ export class WorkflowManager { static getTransactionDefinition(workflowId): OrchestratorBuilder { if (!WorkflowManager.workflows.has(workflowId)) { - throw new Error(`Workflow with id "${workflowId}" not found.`) + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `Workflow with id "${workflowId}" not found.` + ) } const workflow = WorkflowManager.workflows.get(workflowId)! diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 45b2b0e899..7ec5d5c929 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,13 +1,14 @@ import { MedusaApp } from "@medusajs/modules-sdk" import { RemoteJoinerQuery } from "@medusajs/types" import { TransactionHandlerType } from "@medusajs/utils" -import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService, MedusaWorkflow } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { DB_URL, TestDatabase } from "../utils" +import { WorkflowManager } from "@medusajs/orchestration" const sharedPgConnection = knex({ client: "pg", @@ -239,5 +240,23 @@ describe("Workflow Orchestrator module", function () { await jest.runOnlyPendingTimersAsync() expect(spy).toHaveBeenCalledTimes(2) }) + + it("should remove scheduled workflow if workflow no longer exists", async () => { + const spy = await createScheduled("remove-scheduled", { + cron: "* * * * * *", + }) + const logSpy = jest.spyOn(console, "warn") + + await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(1) + + WorkflowManager["workflows"].delete("remove-scheduled") + + await jest.runOnlyPendingTimersAsync() + expect(spy).toHaveBeenCalledTimes(1) + expect(logSpy).toHaveBeenCalledWith( + "Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler." + ) + }) }) }) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index d8bad5a78e..516f27b9ca 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -6,7 +6,12 @@ import { WorkflowScheduler, } from "@medusajs/orchestration" import { ContainerLike, Context, MedusaContainer } from "@medusajs/types" -import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils" +import { + InjectSharedContext, + MedusaContext, + MedusaError, + isString, +} from "@medusajs/utils" import { MedusaWorkflow, ReturnWorkflow, @@ -108,7 +113,10 @@ export class WorkflowOrchestratorService { : workflowIdOrWorkflow.getName() if (!workflowId) { - throw new Error("Workflow ID is required") + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `Workflow ID is required` + ) } context ??= {} @@ -122,7 +130,10 @@ export class WorkflowOrchestratorService { const exportedWorkflow: any = MedusaWorkflow.getWorkflow(workflowId) if (!exportedWorkflow) { - throw new Error(`Workflow with id "${workflowId}" not found.`) + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `Workflow with id "${workflowId}" not found.` + ) } const flow = exportedWorkflow(container as MedusaContainer) diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index eb94cf8701..e4a54abc8f 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -8,7 +8,7 @@ import { TransactionStep, } from "@medusajs/orchestration" import { ModulesSdkTypes } from "@medusajs/types" -import { TransactionState } from "@medusajs/utils" +import { MedusaError, TransactionState } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" import { CronExpression, parseExpression } from "cron-parser" @@ -266,9 +266,9 @@ export class InMemoryDistributedTransactionStorage } async removeAll(): Promise { - this.scheduled.forEach((_, key) => { - this.remove(key) - }) + for (const [key, job] of this.scheduled) { + await this.remove(key) + } } async jobHandler(jobId: string) { @@ -297,9 +297,22 @@ export class InMemoryDistributedTransactionStorage config: job.config, }) - // With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported. - await this.workflowOrchestratorService_.run(jobId, { - throwOnError: false, - }) + try { + // With running the job after setting a new timer we basically allow for concurrent runs, unless we add idempotency keys once they are supported. + await this.workflowOrchestratorService_.run(jobId, { + throwOnError: false, + }) + } catch (e) { + if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) { + console.warn( + `Tried to execute a scheduled workflow with ID ${jobId} that does not exist, removing it from the scheduler.` + ) + + await this.remove(jobId) + return + } + + throw e + } } } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 6a1250b3f0..09a9cf68c4 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -2,6 +2,7 @@ import { MedusaApp } from "@medusajs/modules-sdk" import { TransactionStepTimeoutError, TransactionTimeoutError, + WorkflowManager, } from "@medusajs/orchestration" import { RemoteQueryFunction } from "@medusajs/types" import { TransactionHandlerType, TransactionStepState } from "@medusajs/utils" @@ -316,5 +317,23 @@ describe("Workflow Orchestrator module", function () { await setTimeout(3100) expect(spy).toHaveBeenCalledTimes(2) }) + + it("should remove scheduled workflow if workflow no longer exists", async () => { + const spy = await createScheduled("remove-scheduled", { + cron: "* * * * * *", + }) + const logSpy = jest.spyOn(console, "warn") + + await setTimeout(1100) + expect(spy).toHaveBeenCalledTimes(1) + + WorkflowManager["workflows"].delete("remove-scheduled") + + await setTimeout(1100) + expect(spy).toHaveBeenCalledTimes(1) + expect(logSpy).toHaveBeenCalledWith( + "Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler." + ) + }) }) }) diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index c8c5ddbd00..72edff2711 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -7,7 +7,7 @@ import { TransactionStep, } from "@medusajs/orchestration" import { ModulesSdkTypes } from "@medusajs/types" -import { TransactionState, promiseAll } from "@medusajs/utils" +import { MedusaError, TransactionState, promiseAll } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" import { Queue, Worker } from "bullmq" import Redis from "ioredis" @@ -123,11 +123,24 @@ export class RedisDistributedTransactionStorage jobId: string, schedulerOptions: SchedulerOptions ) { - // TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency - // of the transaction to ensure that the transaction is only executed once. - return await this.workflowOrchestratorService_.run(jobId, { - throwOnError: false, - }) + try { + // TODO: In the case of concurrency being forbidden, we want to generate a predictable transaction ID and rely on the idempotency + // of the transaction to ensure that the transaction is only executed once. + return await this.workflowOrchestratorService_.run(jobId, { + throwOnError: false, + }) + } catch (e) { + if (e instanceof MedusaError && e.type === MedusaError.Types.NOT_FOUND) { + console.warn( + `Tried to execute a scheduled workflow with ID ${jobId} that does not exist, removing it from the scheduler.` + ) + + await this.remove(jobId) + return + } + + throw e + } } async get(key: string): Promise {