chore(): only execute race execution checks and publish message only for async workflows (#13396)

* chore(): only execute race execution checks for async workflows

* chore(): workflow redis publish only for async flows

* Create cyan-gorillas-poke.md

* chore(): workflow redis publish only for async flows

* fix negative check

---------

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-09-04 09:12:09 +02:00
committed by GitHub
parent 28830567e7
commit 2b89510df3
7 changed files with 144 additions and 52 deletions

View File

@@ -0,0 +1,6 @@
---
"@medusajs/workflow-engine-inmemory": patch
"@medusajs/workflow-engine-redis": patch
---
chore(): only execute race execution checks for async workflows

View File

@@ -47,11 +47,11 @@ import { createScheduled } from "../__fixtures__/workflow_scheduled"
jest.setTimeout(60000)
const failTrap = (done) => {
const failTrap = (done, name) => {
setTimeoutSync(() => {
// REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending
console.warn(
`Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually.`
`Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}`
)
done()
}, 5000)
@@ -200,7 +200,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
})
failTrap(done)
failTrap(
done,
"should cancel an ongoing execution with async unfinished yet step"
)
})
it("should cancel a complete execution with a sync workflow running as async", async () => {
@@ -537,7 +540,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
})
failTrap(done)
failTrap(
done,
"should subscribe to a async workflow and receive the response when it finishes"
)
})
describe("Testing basic workflow", function () {
@@ -744,7 +750,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
expect(onFinish).toHaveBeenCalledTimes(0)
failTrap(done)
failTrap(
done,
"should subscribe to a async workflow and receive the response when it finishes"
)
})
it("should cancel and revert a completed workflow", async () => {
@@ -817,7 +826,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throwOnError: true,
})
failTrap(done)
failTrap(
done,
"should not run conditional steps if condition is false"
)
})
it("should not run conditional steps if condition is false", (done) => {
@@ -839,7 +851,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throwOnError: true,
})
failTrap(done)
failTrap(
done,
"should not run conditional steps if condition is false"
)
})
})
@@ -988,7 +1003,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
},
})
failTrap(done)
failTrap(
done,
"should display error when multple async steps are running in parallel"
)
})
})

View File

@@ -14,11 +14,11 @@ import "../__fixtures__"
jest.setTimeout(300000)
const failTrap = (done) => {
const failTrap = (done, name) => {
setTimeoutSync(() => {
// REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending
console.warn(
"Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually."
`Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}`
)
done()
}, 5000)
@@ -99,7 +99,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
.catch((e) => e)
failTrap(done)
failTrap(
done,
"should prevent race continuation of the workflow during retryIntervalAwaiting in background execution"
)
})
it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => {
@@ -190,7 +193,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
.catch((e) => e)
failTrap(done)
failTrap(
done,
"should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution"
)
})
})
},

View File

@@ -285,11 +285,13 @@ export class InMemoryDistributedTransactionStorage
const { retentionTime } = options ?? {}
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
if (data.flow.hasAsyncSteps) {
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
}
// Only store retention time if it's provided
if (retentionTime) {

View File

@@ -15,11 +15,11 @@ import "../__fixtures__"
jest.setTimeout(300000)
const failTrap = (done) => {
const failTrap = (done, name) => {
setTimeoutSync(() => {
// REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending
console.warn(
"Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually."
`Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually. ${name}`
)
done()
}, 5000)
@@ -118,7 +118,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(result).toBe("result from step 0")
})
failTrap(done)
failTrap(
done,
"should prevent race continuation of the workflow during retryIntervalAwaiting in background execution"
)
})
it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => {
@@ -214,7 +217,10 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
expect(result).toBe("result from step 0")
})
failTrap(done)
failTrap(
done,
"should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution"
)
})
})
},

View File

@@ -64,6 +64,7 @@ type IdempotencyKeyParts = {
type NotifyOptions = {
eventType: keyof DistributedTransactionEvents
isFlowAsync: boolean
workflowId: string
transactionId?: string
step?: TransactionStep
@@ -315,12 +316,6 @@ export class WorkflowOrchestratorService {
throw new Error("Transaction ID is required")
}
const events: FlowRunOptions["events"] = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
workflowId,
transactionId: transactionId,
})
const exportedWorkflow = MedusaWorkflow.getWorkflow(workflowId)
if (!exportedWorkflow) {
throw new Error(`Workflow with id "${workflowId}" not found.`)
@@ -344,6 +339,12 @@ export class WorkflowOrchestratorService {
throw new Error("Transaction not found")
}
const events: FlowRunOptions["events"] = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
workflowId,
transactionId: transactionId,
})
const ret = await exportedWorkflow.cancel({
transaction,
throwOnError: false,
@@ -373,6 +374,7 @@ export class WorkflowOrchestratorService {
const { result, errors } = ret
this.notify({
isFlowAsync: ret.transaction.getFlow().hasAsyncSteps,
eventType: "onFinish",
workflowId,
transactionId: transaction.transactionId,
@@ -471,6 +473,7 @@ export class WorkflowOrchestratorService {
const { result, errors } = ret
this.notify({
isFlowAsync: ret.transaction.getFlow().hasAsyncSteps,
eventType: "onFinish",
workflowId,
transactionId,
@@ -544,6 +547,7 @@ export class WorkflowOrchestratorService {
const { result, errors } = ret
this.notify({
isFlowAsync: ret.transaction.getFlow().hasAsyncSteps,
eventType: "onFinish",
workflowId,
transactionId,
@@ -669,16 +673,8 @@ export class WorkflowOrchestratorService {
return
}
if (publish) {
const channel = this.getChannelName(options.workflowId)
const message = JSON.stringify({
instanceId: this.instanceId,
data: options,
})
await this.redisPublisher.publish(channel, message)
}
const {
isFlowAsync,
eventType,
workflowId,
transactionId,
@@ -689,6 +685,15 @@ export class WorkflowOrchestratorService {
state,
} = options
if (publish && isFlowAsync) {
const channel = this.getChannelName(options.workflowId)
const message = JSON.stringify({
instanceId: this.instanceId,
data: options,
})
await this.redisPublisher.publish(channel, message)
}
const subscribers: TransactionSubscribers =
this.subscribers.get(workflowId) ?? new Map()
@@ -698,6 +703,7 @@ export class WorkflowOrchestratorService {
eventType,
workflowId,
transactionId,
isFlowAsync,
step,
response,
result,
@@ -743,6 +749,7 @@ export class WorkflowOrchestratorService {
transactionId,
}): DistributedTransactionEvents {
const notify = async ({
isFlowAsync,
eventType,
step,
result,
@@ -750,6 +757,7 @@ export class WorkflowOrchestratorService {
errors,
state,
}: {
isFlowAsync: boolean
eventType: keyof DistributedTransactionEvents
step?: TransactionStep
response?: unknown
@@ -758,6 +766,7 @@ export class WorkflowOrchestratorService {
state?: TransactionState
}) => {
await this.notify({
isFlowAsync,
workflowId,
transactionId,
eventType,
@@ -772,31 +781,50 @@ export class WorkflowOrchestratorService {
return {
onTimeout: async ({ transaction }) => {
customEventHandlers?.onTimeout?.({ transaction })
await notify({ eventType: "onTimeout" })
await notify({
eventType: "onTimeout",
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
onBegin: async ({ transaction }) => {
customEventHandlers?.onBegin?.({ transaction })
await notify({ eventType: "onBegin" })
await notify({
eventType: "onBegin",
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
onResume: async ({ transaction }) => {
customEventHandlers?.onResume?.({ transaction })
await notify({ eventType: "onResume" })
await notify({
eventType: "onResume",
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
onCompensateBegin: async ({ transaction }) => {
customEventHandlers?.onCompensateBegin?.({ transaction })
await notify({ eventType: "onCompensateBegin" })
await notify({
eventType: "onCompensateBegin",
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
onFinish: async ({ transaction, result, errors }) => {
customEventHandlers?.onFinish?.({ transaction, result, errors })
await notify({ eventType: "onFinish" })
await notify({
eventType: "onFinish",
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
onStepBegin: async ({ step, transaction }) => {
customEventHandlers?.onStepBegin?.({ step, transaction })
this.activeStepsCount++
await notify({ eventType: "onStepBegin", step })
await notify({
eventType: "onStepBegin",
step,
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
onStepSuccess: async ({ step, transaction }) => {
const stepName = step.definition.action!
@@ -805,7 +833,12 @@ export class WorkflowOrchestratorService {
transaction
)
customEventHandlers?.onStepSuccess?.({ step, transaction, response })
await notify({ eventType: "onStepSuccess", step, response })
await notify({
eventType: "onStepSuccess",
step,
response,
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
this.activeStepsCount--
},
@@ -816,14 +849,23 @@ export class WorkflowOrchestratorService {
.filter((err) => err.action === stepName)
customEventHandlers?.onStepFailure?.({ step, transaction, errors })
await notify({ eventType: "onStepFailure", step, errors })
await notify({
eventType: "onStepFailure",
step,
errors,
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
this.activeStepsCount--
},
onStepAwaiting: async ({ step, transaction }) => {
customEventHandlers?.onStepAwaiting?.({ step, transaction })
await notify({ eventType: "onStepAwaiting", step })
await notify({
eventType: "onStepAwaiting",
step,
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
this.activeStepsCount--
},
@@ -837,7 +879,12 @@ export class WorkflowOrchestratorService {
response,
})
await notify({ eventType: "onCompensateStepSuccess", step, response })
await notify({
eventType: "onCompensateStepSuccess",
step,
response,
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
onCompensateStepFailure: async ({ step, transaction }) => {
const stepName = step.definition.action!
@@ -847,7 +894,12 @@ export class WorkflowOrchestratorService {
customEventHandlers?.onStepFailure?.({ step, transaction, errors })
await notify({ eventType: "onCompensateStepFailure", step, errors })
await notify({
eventType: "onCompensateStepFailure",
step,
errors,
isFlowAsync: transaction.getFlow().hasAsyncSteps,
})
},
}
}

View File

@@ -413,11 +413,13 @@ export class RedisDistributedTransactionStorage
const { retentionTime } = options ?? {}
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
if (data.flow.hasAsyncSteps) {
await this.#preventRaceConditionExecutionIfNecessary({
data,
key,
options,
})
}
if (hasFinished && retentionTime) {
Object.assign(data, {