From c9c2b6c88fc9ad956fcdd91d7ef7d85167fa8003 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Fri, 7 Jun 2024 12:38:04 +0200 Subject: [PATCH] chore: Ensure the transactionId/eventGroupId are passed down the child workflows (#7648) --- .../utils/composer/__tests__/index.spec.ts | 102 +++++++++++++++++- .../src/utils/composer/create-step.ts | 6 +- .../workflows-sdk/src/utils/composer/type.ts | 8 ++ 3 files changed, 111 insertions(+), 5 deletions(-) diff --git a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts index 53217123c3..55f7b2a8fc 100644 --- a/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts +++ b/packages/core/workflows-sdk/src/utils/composer/__tests__/index.spec.ts @@ -10,13 +10,13 @@ const getNewWorkflowId = () => `workflow-${count++}` describe("Workflow composer", () => { describe("running sub workflows", () => { it("should succeed", async function () { - const step1 = createStep("step1", async () => { + const step1 = createStep("step1", async (_, context) => { return new StepResponse({ result: "step1" }) }) - const step2 = createStep("step2", async (input: string) => { + const step2 = createStep("step2", async (input: string, context) => { return new StepResponse({ result: input }) }) - const step3 = createStep("step3", async (input: string) => { + const step3 = createStep("step3", async (input: string, context) => { return new StepResponse({ result: input }) }) @@ -103,6 +103,102 @@ describe("Workflow composer", () => { expect(step2Mock).toHaveBeenCalledTimes(1) expect(step3Mock).toHaveBeenCalledTimes(1) }) + + it("should succeed and pass down the transaction id and event group id when provided from the context", async function () { + let parentContext, childContext + + const childWorkflowStep1 = createStep("step1", async (_, context) => { + childContext = context + return new StepResponse({ result: "step1" }) + }) + const childWorkflowStep2 = createStep( + "step2", + async (input: string, context) => { + return new StepResponse({ result: input }) + } + ) + const step1 = createStep("step3", async (input: string, context) => { + parentContext = context + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow( + getNewWorkflowId(), + function (input: WorkflowData) { + childWorkflowStep1() + return childWorkflowStep2(input) + } + ) + + const workflow = createWorkflow(getNewWorkflowId(), function () { + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + return step1(subWorkflowRes.result) + }) + + const { result } = await workflow.run({ + input: {}, + context: { + eventGroupId: "eventGroupId", + transactionId: "transactionId", + }, + }) + + expect(result).toEqual({ result: "hi from outside" }) + + expect(parentContext.transactionId).toEqual("transactionId") + expect(parentContext.transactionId).toEqual(childContext.transactionId) + + expect(parentContext.eventGroupId).toEqual("eventGroupId") + expect(parentContext.eventGroupId).toEqual(childContext.eventGroupId) + }) + + it("should succeed and pass down the transaction id and event group id when not provided from the context", async function () { + let parentContext, childContext + + const childWorkflowStep1 = createStep("step1", async (_, context) => { + childContext = context + return new StepResponse({ result: "step1" }) + }) + const childWorkflowStep2 = createStep( + "step2", + async (input: string, context) => { + return new StepResponse({ result: input }) + } + ) + const step1 = createStep("step3", async (input: string, context) => { + parentContext = context + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow( + getNewWorkflowId(), + function (input: WorkflowData) { + childWorkflowStep1() + return childWorkflowStep2(input) + } + ) + + const workflow = createWorkflow(getNewWorkflowId(), function () { + const subWorkflowRes = subWorkflow.runAsStep({ + input: "hi from outside", + }) + return step1(subWorkflowRes.result) + }) + + const { result } = await workflow.run({ + input: {}, + }) + + expect(result).toEqual({ result: "hi from outside" }) + + expect(parentContext.transactionId).toBeTruthy() + expect(parentContext.transactionId).toEqual(childContext.transactionId) + + expect(parentContext.eventGroupId).toBeTruthy() + expect(parentContext.eventGroupId).toEqual(childContext.eventGroupId) + }) }) it("should not throw an unhandled error on failed transformer resolution after a step fail, but should rather push the errors in the errors result", async function () { diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 10dd2ce961..47d0a223d1 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -4,9 +4,9 @@ import { WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" -import { OrchestrationUtils, deepCopy, isString } from "@medusajs/utils" +import { deepCopy, isString, OrchestrationUtils } from "@medusajs/utils" import { ulid } from "ulid" -import { StepResponse, resolveValue } from "./helpers" +import { resolveValue, StepResponse } from "./helpers" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -135,6 +135,8 @@ function applyStep< attempt: metadata.attempt, container: stepArguments.container, metadata, + eventGroupId: stepArguments.context!.eventGroupId, + transactionId: stepArguments.context!.transactionId, context: stepArguments.context!, } diff --git a/packages/core/workflows-sdk/src/utils/composer/type.ts b/packages/core/workflows-sdk/src/utils/composer/type.ts index 31fa467ce8..5aa74473d4 100644 --- a/packages/core/workflows-sdk/src/utils/composer/type.ts +++ b/packages/core/workflows-sdk/src/utils/composer/type.ts @@ -131,6 +131,14 @@ export interface StepExecutionContext { * {@inheritDoc types!Context} */ context: Context + /** + * A string indicating the ID of the group to aggregate the events to be emitted at a later point. + */ + eventGroupId?: string + /** + * A string indicating the ID of the current transaction. + */ + transactionId?: string } export type WorkflowTransactionContext = StepExecutionContext &