fix: test utils events + workflow storage (#12834)

* feat(test-utils): Make event subscriber waiter robust and concurrent

* feat(test-utils): Make event subscriber waiter robust and concurrent

* fix workflows storage

* remove timeout

* Create gentle-teachers-doubt.md

* revert timestamp

* update changeset

* fix execution loop

* exit if no steps to await

* typo

* check next

* check next

* changeset

* skip when async steps

* wait workflow executions utils

* wait workflow executions utils

* wait workflow executions utils

* increase timeout

* break loop

---------

Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-06-30 13:34:08 +02:00
committed by GitHub
parent 1e16fa6f57
commit 95d282e8ef
13 changed files with 441 additions and 190 deletions

View File

@@ -0,0 +1,9 @@
---
"@medusajs/utils": patch
"@medusajs/orchestration": patch
"@medusajs/test-utils": patch
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
---
Fix/test utils events

View File

@@ -8,7 +8,7 @@ import {
import path from "path"
import { ContainerRegistrationKeys } from "@medusajs/utils"
jest.setTimeout(300000)
jest.setTimeout(100000)
medusaIntegrationTestRunner({
medusaConfigFile: path.join(__dirname, "../../../__fixtures__/auth"),

View File

@@ -6,7 +6,7 @@ import {
createAdminUser,
} from "../../../../helpers/create-admin-user"
jest.setTimeout(30000)
jest.setTimeout(100000)
medusaIntegrationTestRunner({
testSuite: ({ dbConnection, getContainer, api }) => {

View File

@@ -841,6 +841,7 @@ export class TransactionOrchestrator extends EventEmitter {
const execution: Promise<void | unknown>[] = []
let i = 0
let hasAsyncSteps = false
for (const step of nextSteps.next) {
const stepIndex = i++
if (!stepsShouldContinueExecution[stepIndex]) {
@@ -876,12 +877,13 @@ export class TransactionOrchestrator extends EventEmitter {
} else {
// Execute async step in background and continue the execution of the transaction
this.executeAsyncStep(promise, transaction, step, nextSteps)
hasAsyncSteps = true
}
}
await promiseAll(execution)
if (nextSteps.next.length === 0) {
if (nextSteps.next.length === 0 || (hasAsyncSteps && !execution.length)) {
continueExecution = false
}
}
@@ -1290,6 +1292,16 @@ export class TransactionOrchestrator extends EventEmitter {
)
}
if (
flow.state === TransactionState.COMPENSATING ||
flow.state === TransactionState.WAITING_TO_COMPENSATE
) {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
`Cannot revert a transaction that is already compensating.`
)
}
flow.state = TransactionState.WAITING_TO_COMPENSATE
flow.cancelledAt = Date.now()

File diff suppressed because it is too large Load Diff

View File

@@ -80,6 +80,7 @@ const SPECIAL_PROPERTIES: {
nullable: false,
fieldName: field.fieldName,
defaultRaw: "now()",
onCreate: () => new Date(),
})(MikroORMEntity.prototype, field.fieldName)
},
updated_at: (MikroORMEntity, field) => {
@@ -89,6 +90,8 @@ const SPECIAL_PROPERTIES: {
nullable: false,
fieldName: field.fieldName,
defaultRaw: "now()",
onCreate: () => new Date(),
onUpdate: () => new Date(),
})(MikroORMEntity.prototype, field.fieldName)
},
deleted_at: (MikroORMEntity, field, tableName) => {

View File

@@ -39,7 +39,13 @@ describe("waitSubscribersExecution", () => {
})
it("should reject when timeout is reached before event is fired", async () => {
const waitPromise = waitSubscribersExecution(TEST_EVENT, eventBus as any)
const waitPromise = waitSubscribersExecution(
TEST_EVENT,
eventBus as any,
{
timeout: 5000,
}
)
jest.advanceTimersByTime(5100)

View File

@@ -1,82 +1,175 @@
import { IEventBusModuleService } from "@medusajs/framework/types"
import { EventEmitter } from "events"
// Allows you to wait for all subscribers to execute for a given event. Only works with the local event bus.
export const waitSubscribersExecution = (
eventName: string,
eventBus: IEventBusModuleService,
{
timeout = 5000,
}: {
timeout?: number
} = {}
) => {
const eventEmitter: EventEmitter = (eventBus as any).eventEmitter_
const subscriberPromises: Promise<any>[] = []
const originalListeners = eventEmitter.listeners(eventName)
let timeoutId: NodeJS.Timeout | null = null
type EventBus = {
eventEmitter_: EventEmitter
}
// Create a promise that rejects after the timeout
const timeoutPromise = new Promise((_, reject) => {
type WaitSubscribersExecutionOptions = {
timeout?: number
}
// Map to hold pending promises for each event.
const waits = new Map<string | symbol, Promise<any>>()
/**
* Creates a promise that rejects after a specified timeout.
* @param timeout - The timeout in milliseconds.
* @param eventName - The name of the event being waited on.
* @returns A tuple containing the timeout promise and a function to clear the timeout.
*/
const createTimeoutPromise = (
timeout: number,
eventName: string | symbol
): [Promise<never>, () => void] => {
let timeoutId: NodeJS.Timeout | null = null
const promise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
reject(
new Error(
`Timeout of ${timeout}ms exceeded while waiting for event "${eventName}"`
`Timeout of ${timeout}ms exceeded while waiting for event "${String(
eventName
)}"`
)
)
}, timeout)
timeoutId.unref()
})
return [promise, () => timeoutId && clearTimeout(timeoutId)]
}
// Core logic to wait for subscribers.
const doWaitSubscribersExecution = (
eventName: string | symbol,
eventBus: EventBus,
{ timeout = 15000 }: WaitSubscribersExecutionOptions = {}
): Promise<any> => {
const eventEmitter = eventBus.eventEmitter_
const subscriberPromises: Promise<any>[] = []
const [timeoutPromise, clearTimeout] = createTimeoutPromise(
timeout,
eventName
)
// If there are no existing listeners, resolve once the event happens. Otherwise, wrap the existing subscribers in a promise and resolve once they are done.
if (!eventEmitter.listeners(eventName).length) {
let ok
let ok: (value?: any) => void
const promise = new Promise((resolve) => {
ok = resolve
})
subscriberPromises.push(promise)
eventEmitter.on(eventName, ok)
const newListener = async (...args: any[]) => {
eventEmitter.removeListener(eventName, newListener)
ok(...args)
}
Object.defineProperty(newListener, "__isSubscribersExecutionWrapper", {
value: true,
configurable: true,
enumerable: false,
})
eventEmitter.on(eventName, newListener)
} else {
eventEmitter.listeners(eventName).forEach((listener: any) => {
if (listener.__isSubscribersExecutionWrapper) {
return
}
eventEmitter.removeListener(eventName, listener)
let ok, nok
let ok: (value?: any) => void, nok: (reason?: any) => void
const promise = new Promise((resolve, reject) => {
ok = resolve
nok = reject
})
subscriberPromises.push(promise)
const newListener = async (...args2) => {
const newListener = async (...args2: any[]) => {
// As soon as the subscriber is executed, we restore the original listener
eventEmitter.removeListener(eventName, newListener)
let listenerToAdd = listener
while (listenerToAdd.originalListener) {
listenerToAdd = listenerToAdd.originalListener
}
eventEmitter.on(eventName, listenerToAdd)
try {
const res = await listener.apply(eventBus, args2)
ok(res)
return res
} catch (error) {
nok(error)
}
}
Object.defineProperty(newListener, "__isSubscribersExecutionWrapper", {
value: true,
configurable: true,
enumerable: false,
})
Object.defineProperty(newListener, "originalListener", {
value: listener,
configurable: true,
enumerable: false,
})
eventEmitter.on(eventName, newListener)
})
}
const subscribersPromise = Promise.all(subscriberPromises).finally(() => {
// Clear the timeout since events have been fired and handled
if (timeoutId !== null) {
clearTimeout(timeoutId)
}
// Restore original event listeners
eventEmitter.removeAllListeners(eventName)
originalListeners.forEach((listener) => {
eventEmitter.on(eventName, listener as (...args: any) => void)
})
clearTimeout()
})
// Race between the subscribers and the timeout
return Promise.race([subscribersPromise, timeoutPromise])
}
/**
* Allows you to wait for all subscribers to execute for a given event.
* It ensures that concurrent waits for the same event are queued and executed sequentially.
*
* @param eventName - The name of the event to wait for.
* @param eventBus - The event bus instance.
* @param options - Options including timeout.
*/
export const waitSubscribersExecution = (
eventName: string | symbol,
eventBus: any,
options?: WaitSubscribersExecutionOptions
): Promise<any> => {
const chain = waits.get(eventName)
if (!chain) {
const newPromise = doWaitSubscribersExecution(
eventName,
eventBus,
options
).finally(() => {
// Once this chain is done, remove it from the map
// if it's still the same promise. This prevents race conditions
// where a new wait is queued before this one is removed.
if (waits.get(eventName) === newPromise) {
waits.delete(eventName)
}
})
waits.set(eventName, newPromise)
return newPromise
}
const runner = () => {
return doWaitSubscribersExecution(eventName, eventBus, options)
}
const newPromise = chain.then(runner, runner).finally(() => {
// Once this chain is done, remove it from the map
// if it's still the same promise. This prevents race conditions
// where a new wait is queued before this one is removed.
if (waits.get(eventName) === newPromise) {
waits.delete(eventName)
}
})
waits.set(eventName, newPromise)
return newPromise
}

View File

@@ -0,0 +1,35 @@
import { Modules } from "@medusajs/framework/utils"
import { MedusaContainer } from "@medusajs/types"
/**
* Waits for all workflow executions to finish. When relying on workflows but not necessarily
* waiting for them to finish, this can be used to ensure that a test is not considered done while background executions are still running and can interfere with the other tests.
* @param container - The container instance.
* @returns A promise that resolves when all workflow executions have finished.
*/
export async function waitWorkflowExecutions(container: MedusaContainer) {
const wfe = container.resolve(Modules.WORKFLOW_ENGINE, {
allowUnregistered: true,
})
if (!wfe) {
return
}
const timeout = setTimeout(() => {
throw new Error("Timeout waiting for workflow executions to finish")
}, 10000).unref()
let waitWorkflowsToFinish = true
while (waitWorkflowsToFinish) {
const executions = await wfe.listWorkflowExecutions({
state: { $nin: ["not_started", "done", "reverted", "failed"] },
})
if (executions.length === 0) {
waitWorkflowsToFinish = false
clearTimeout(timeout)
break
}
await new Promise((resolve) => setTimeout(resolve, 50))
}
}

View File

@@ -18,6 +18,7 @@ import {
startApp,
syncLinks,
} from "./medusa-test-runner-utils"
import { waitWorkflowExecutions } from "./medusa-test-runner-utils/wait-workflow-executions"
export interface MedusaSuiteOptions {
dbConnection: any // knex instance
@@ -34,6 +35,9 @@ export interface MedusaSuiteOptions {
clientUrl: string
}
getMedusaApp: () => MedusaAppOutput
utils: {
waitWorkflowExecutions: (container: MedusaContainer) => Promise<void>
}
}
interface TestRunnerConfig {
@@ -268,6 +272,7 @@ class MedusaTestRunner {
public async afterEach(): Promise<void> {
try {
await waitWorkflowExecutions(this.globalContainer as MedusaContainer)
await this.dbUtils.teardown({ schema: this.schema })
} catch (error) {
logger.error("Error tearing down database:", error?.message)
@@ -287,6 +292,10 @@ class MedusaTestRunner {
clientUrl: this.dbConfig.clientUrl,
},
dbUtils: this.dbUtils,
utils: {
waitWorkflowExecutions: () =>
waitWorkflowExecutions(this.globalContainer as MedusaContainer),
},
}
}
}

View File

@@ -157,6 +157,7 @@ export class InMemoryDistributedTransactionStorage
.catch(() => undefined)
if (trx) {
const { flow, errors } = this.storage.get(key) ?? {}
const { idempotent } = options ?? {}
const execution = trx.execution as TransactionFlow
@@ -183,9 +184,9 @@ export class InMemoryDistributedTransactionStorage
}
return {
flow: trx.execution as TransactionFlow,
flow: flow ?? (trx.execution as TransactionFlow),
context: trx.context?.data as TransactionContext,
errors: trx.context?.errors as TransactionStepError[],
errors: errors ?? (trx.context?.errors as TransactionStepError[]),
}
}

View File

@@ -1153,7 +1153,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
const notExpiredWorkflowId = "not-expired-workflow-" + ulid()
createWorkflow(
{ name: notExpiredWorkflowId, retentionTime: 1000 },
{ name: notExpiredWorkflowId, retentionTime: 10000 },
() => {
return new WorkflowResponse("not expired")
}

View File

@@ -291,6 +291,15 @@ export class RedisDistributedTransactionStorage
.catch(() => undefined)
if (trx) {
const rawData = await this.redisClient.get(key)
let flow!: TransactionFlow, errors!: TransactionStepError[]
if (rawData) {
const data = JSON.parse(rawData)
flow = data.flow
errors = data.errors
}
const { idempotent } = options ?? {}
const execution = trx.execution as TransactionFlow
@@ -317,9 +326,9 @@ export class RedisDistributedTransactionStorage
}
return {
flow: trx.execution as TransactionFlow,
flow: flow ?? (trx.execution as TransactionFlow),
context: trx.context?.data as TransactionContext,
errors: trx.context?.errors as TransactionStepError[],
errors: errors ?? (trx.context?.errors as TransactionStepError[]),
}
}