From bcba5b3a6cf1aa1275f6553eacdb3726a4d860b3 Mon Sep 17 00:00:00 2001 From: Stevche Radevski Date: Tue, 23 Jul 2024 16:59:27 +0200 Subject: [PATCH] chore: Make scheduled job tests less likely to fail (#8241) --- .../__fixtures__/workflow_scheduled.ts | 7 ++- .../integration-tests/__tests__/index.spec.ts | 62 ++++++++++++++----- .../utils/workflow-orchestrator-storage.ts | 9 ++- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts index 81213a48f0..ccf95f1e19 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__fixtures__/workflow_scheduled.ts @@ -5,8 +5,13 @@ import { StepResponse, } from "@medusajs/workflows-sdk" -export const createScheduled = (name: string, schedule?: SchedulerOptions) => { +export const createScheduled = ( + name: string, + next: () => void, + schedule?: SchedulerOptions +) => { const workflowScheduledStepInvoke = jest.fn((input, { container }) => { + next() return new StepResponse({ testValue: container.resolve("test-value"), }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 57d1ba308a..f4292c934e 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -24,6 +24,7 @@ import "../__fixtures__" import { createScheduled } from "../__fixtures__/workflow_scheduled" import { DB_URL, TestDatabase } from "../utils" import { WorkflowsModuleService } from "@medusajs/workflow-engine-inmemory/dist/services" +import Redis from "ioredis" jest.setTimeout(100000) @@ -40,6 +41,24 @@ const afterEach_ = async () => { await TestDatabase.clearTables(sharedPgConnection) } +function times(num) { + let resolver + let counter = 0 + const promise = new Promise((resolve) => { + resolver = resolve + }) + + return { + next: () => { + counter += 1 + if (counter === num) { + resolver() + } + }, + promise, + } +} + describe("Workflow Orchestrator module", function () { let workflowOrcModule: IWorkflowEngineService let query: RemoteQueryFunction @@ -49,6 +68,10 @@ describe("Workflow Orchestrator module", function () { const container = createMedusaContainer() container.register(ContainerRegistrationKeys.LOGGER, asValue(console)) + // Clear any residual data in Redis + const redisClient = new Redis() + redisClient.flushall() + const { runMigrations, query: remoteQuery, @@ -83,10 +106,6 @@ describe("Workflow Orchestrator module", function () { workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService }) - afterAll(async () => { - await WorkflowManager.unregisterAll() - }) - it(`should export the appropriate linkable configuration`, () => { const linkable = Module(Modules.WORKFLOW_ENGINE, { service: WorkflowsModuleService, @@ -352,33 +371,44 @@ describe("Workflow Orchestrator module", function () { // Mocking bullmq, however, would make the tests close to useless, so we can keep them very minimal and serve as smoke tests. describe("Scheduled workflows", () => { it("should execute a scheduled workflow", async () => { - const spy = createScheduled("standard") - await setTimeout(3100) - expect(spy).toHaveBeenCalledTimes(3) + const wait = times(2) + const spy = createScheduled("standard", wait.next) + + await wait.promise + expect(spy).toHaveBeenCalledTimes(2) + WorkflowManager.unregister("standard") }) it("should stop executions after the set number of executions", async () => { - const spy = await createScheduled("num-executions", { + const wait = times(2) + const spy = await createScheduled("num-executions", wait.next, { cron: "* * * * * *", numberOfExecutions: 2, }) - await setTimeout(3100) + + await wait.promise expect(spy).toHaveBeenCalledTimes(2) + + // Make sure that on the next tick it doesn't execute again + await setTimeout(1100) + expect(spy).toHaveBeenCalledTimes(2) + + WorkflowManager.unregister("num-execution") }) it("should remove scheduled workflow if workflow no longer exists", async () => { + const wait = times(1) const logger = sharedContainer_.resolve( ContainerRegistrationKeys.LOGGER ) - const spy = await createScheduled("remove-scheduled", { + const spy = await createScheduled("remove-scheduled", wait.next, { cron: "* * * * * *", }) const logSpy = jest.spyOn(logger, "warn") - await setTimeout(1100) + await wait.promise expect(spy).toHaveBeenCalledTimes(1) - WorkflowManager["workflows"].delete("remove-scheduled") await setTimeout(1100) @@ -389,19 +419,23 @@ describe("Workflow Orchestrator module", function () { }) it("the scheduled workflow should have access to the shared container", async () => { + const wait = times(1) sharedContainer_.register( "test-value", asFunction(() => "test") ) - const spy = await createScheduled("shared-container-job", { + const spy = await createScheduled("shared-container-job", wait.next, { cron: "* * * * * *", }) - await setTimeout(1100) + + await wait.promise expect(spy).toHaveBeenCalledTimes(1) + expect(spy).toHaveReturnedWith( expect.objectContaining({ output: { testValue: "test" } }) ) + WorkflowManager.unregister("shared-container-job") }) }) }) diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 37803d67f6..b7107e2017 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -395,7 +395,14 @@ export class RedisDistributedTransactionStorage } async remove(jobId: string): Promise { - await this.queue.removeRepeatableByKey(`${JobType.SCHEDULE}_${jobId}`) + const repeatableJobs = await this.queue.getRepeatableJobs() + const job = repeatableJobs.find( + (job) => job.id === `${JobType.SCHEDULE}_${jobId}` + ) + + if (job) { + await this.queue.removeRepeatableByKey(job.key) + } } async removeAll(): Promise {