chore: Ensure the transactionId/eventGroupId are passed down the child workflows (#7648)

This commit is contained in:
Adrien de Peretti
2024-06-07 12:38:04 +02:00
committed by GitHub
parent 2e77a076b8
commit c9c2b6c88f
3 changed files with 111 additions and 5 deletions

View File

@@ -10,13 +10,13 @@ const getNewWorkflowId = () => `workflow-${count++}`
describe("Workflow composer", () => {
describe("running sub workflows", () => {
it("should succeed", async function () {
const step1 = createStep("step1", async () => {
const step1 = createStep("step1", async (_, context) => {
return new StepResponse({ result: "step1" })
})
const step2 = createStep("step2", async (input: string) => {
const step2 = createStep("step2", async (input: string, context) => {
return new StepResponse({ result: input })
})
const step3 = createStep("step3", async (input: string) => {
const step3 = createStep("step3", async (input: string, context) => {
return new StepResponse({ result: input })
})
@@ -103,6 +103,102 @@ describe("Workflow composer", () => {
expect(step2Mock).toHaveBeenCalledTimes(1)
expect(step3Mock).toHaveBeenCalledTimes(1)
})
it("should succeed and pass down the transaction id and event group id when provided from the context", async function () {
let parentContext, childContext
const childWorkflowStep1 = createStep("step1", async (_, context) => {
childContext = context
return new StepResponse({ result: "step1" })
})
const childWorkflowStep2 = createStep(
"step2",
async (input: string, context) => {
return new StepResponse({ result: input })
}
)
const step1 = createStep("step3", async (input: string, context) => {
parentContext = context
return new StepResponse({ result: input })
})
const subWorkflow = createWorkflow(
getNewWorkflowId(),
function (input: WorkflowData<string>) {
childWorkflowStep1()
return childWorkflowStep2(input)
}
)
const workflow = createWorkflow(getNewWorkflowId(), function () {
const subWorkflowRes = subWorkflow.runAsStep({
input: "hi from outside",
})
return step1(subWorkflowRes.result)
})
const { result } = await workflow.run({
input: {},
context: {
eventGroupId: "eventGroupId",
transactionId: "transactionId",
},
})
expect(result).toEqual({ result: "hi from outside" })
expect(parentContext.transactionId).toEqual("transactionId")
expect(parentContext.transactionId).toEqual(childContext.transactionId)
expect(parentContext.eventGroupId).toEqual("eventGroupId")
expect(parentContext.eventGroupId).toEqual(childContext.eventGroupId)
})
it("should succeed and pass down the transaction id and event group id when not provided from the context", async function () {
let parentContext, childContext
const childWorkflowStep1 = createStep("step1", async (_, context) => {
childContext = context
return new StepResponse({ result: "step1" })
})
const childWorkflowStep2 = createStep(
"step2",
async (input: string, context) => {
return new StepResponse({ result: input })
}
)
const step1 = createStep("step3", async (input: string, context) => {
parentContext = context
return new StepResponse({ result: input })
})
const subWorkflow = createWorkflow(
getNewWorkflowId(),
function (input: WorkflowData<string>) {
childWorkflowStep1()
return childWorkflowStep2(input)
}
)
const workflow = createWorkflow(getNewWorkflowId(), function () {
const subWorkflowRes = subWorkflow.runAsStep({
input: "hi from outside",
})
return step1(subWorkflowRes.result)
})
const { result } = await workflow.run({
input: {},
})
expect(result).toEqual({ result: "hi from outside" })
expect(parentContext.transactionId).toBeTruthy()
expect(parentContext.transactionId).toEqual(childContext.transactionId)
expect(parentContext.eventGroupId).toBeTruthy()
expect(parentContext.eventGroupId).toEqual(childContext.eventGroupId)
})
})
it("should not throw an unhandled error on failed transformer resolution after a step fail, but should rather push the errors in the errors result", async function () {

View File

@@ -4,9 +4,9 @@ import {
WorkflowStepHandler,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils"
import { deepCopy, isString, OrchestrationUtils } from "@medusajs/utils"
import { ulid } from "ulid"
import { StepResponse, resolveValue } from "./helpers"
import { resolveValue, StepResponse } from "./helpers"
import { proxify } from "./helpers/proxy"
import {
CreateWorkflowComposerContext,
@@ -135,6 +135,8 @@ function applyStep<
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId: stepArguments.context!.eventGroupId,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}

View File

@@ -131,6 +131,14 @@ export interface StepExecutionContext {
* {@inheritDoc types!Context}
*/
context: Context
/**
* A string indicating the ID of the group to aggregate the events to be emitted at a later point.
*/
eventGroupId?: string
/**
* A string indicating the ID of the current transaction.
*/
transactionId?: string
}
export type WorkflowTransactionContext = StepExecutionContext &