From c0ca00290106fbdc8e15077bc8d1c3eafbef59f2 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Tue, 8 Aug 2023 08:06:47 -0300 Subject: [PATCH] feat(orchestration,workflows): pipe oncomplete and workflow preparation (#4697) * chore: pipe onComplete and workflow preparation step * changeset * fix: tests --------- Co-authored-by: Adrien de Peretti Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com> --- .changeset/weak-berries-flow.md | 6 ++ .../transaction/transaction-orchestrator.ts | 29 +++++++++ .../transaction/distributed-transaction.ts | 7 +- .../transaction/transaction-orchestrator.ts | 7 +- .../src/transaction/transaction-step.ts | 14 ++-- .../src/workflow/workflow-manager.ts | 6 +- packages/workflows/package.json | 2 +- .../src/helper/__tests__/pipe.spec.ts | 40 ++++++++++++ .../helper/__tests__/workflow-export.spec.ts | 64 +++++++++++++++++++ packages/workflows/src/helper/pipe.ts | 31 ++++++++- .../workflows/src/helper/workflow-export.ts | 19 +++++- 11 files changed, 205 insertions(+), 20 deletions(-) create mode 100644 .changeset/weak-berries-flow.md create mode 100644 packages/workflows/src/helper/__tests__/workflow-export.spec.ts diff --git a/.changeset/weak-berries-flow.md b/.changeset/weak-berries-flow.md new file mode 100644 index 0000000000..b1dc751f71 --- /dev/null +++ b/.changeset/weak-berries-flow.md @@ -0,0 +1,6 @@ +--- +"@medusajs/orchestration": minor +"@medusajs/workflows": minor +--- + +Add pipe onComplete callback and preparation function to exportsWorkflow diff --git a/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts b/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts index 6fe90e082c..053f706a81 100644 --- a/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/__tests__/transaction/transaction-orchestrator.ts @@ -1,4 +1,5 @@ import { + DistributedTransaction, TransactionHandlerType, TransactionOrchestrator, TransactionPayload, @@ -858,4 +859,32 @@ describe("Transaction Orchestrator", () => { expect(mocks.oneCompensate).toBeCalledTimes(1) expect(mocks.twoCompensate).toBeCalledTimes(1) }) + + it("Should receive the current transaction as reference in the handler", async () => { + let transactionInHandler + + async function handler( + actionId: string, + functionHandlerType: TransactionHandlerType, + payload: TransactionPayload, + transaction?: DistributedTransaction + ) { + transactionInHandler = transaction + } + + const strategy = new TransactionOrchestrator("transaction-name", { + next: { + action: "firstMethod", + }, + }) + + const transaction = await strategy.beginTransaction( + "transaction_id_123", + handler + ) + + await strategy.resume(transaction) + + expect(transaction).toBe(transactionInHandler) + }) }) diff --git a/packages/orchestration/src/transaction/distributed-transaction.ts b/packages/orchestration/src/transaction/distributed-transaction.ts index 834a53f927..47bd5f1b4d 100644 --- a/packages/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/orchestration/src/transaction/distributed-transaction.ts @@ -1,5 +1,6 @@ import { isDefined } from "@medusajs/utils" import { TransactionFlow } from "./transaction-orchestrator" +import { TransactionStepHandler } from "./transaction-step" import { TransactionHandlerType, TransactionState } from "./types" /** @@ -79,11 +80,7 @@ export class DistributedTransaction { constructor( private flow: TransactionFlow, - public handler: ( - actionId: string, - handlerType: TransactionHandlerType, - payload: TransactionPayload - ) => Promise, + public handler: TransactionStepHandler, public payload?: any, errors?: TransactionStepError[], context?: TransactionContext diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 7729d5f589..81503596df 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -3,14 +3,13 @@ import { TransactionCheckpoint, TransactionPayload, } from "./distributed-transaction" +import { TransactionStep, TransactionStepHandler } from "./transaction-step" import { TransactionHandlerType, - TransactionModel, TransactionState, TransactionStepStatus, TransactionStepsDefinition, } from "./types" -import { TransactionStep, TransactionStepHandler } from "./transaction-step" import { EventEmitter } from "events" @@ -366,7 +365,7 @@ export class TransactionOrchestrator extends EventEmitter { if (!step.definition.async) { execution.push( transaction - .handler(step.definition.action + "", type, payload) + .handler(step.definition.action + "", type, payload, transaction) .then(async (response) => { await TransactionOrchestrator.setStepSuccess( transaction, @@ -387,7 +386,7 @@ export class TransactionOrchestrator extends EventEmitter { execution.push( transaction.saveCheckpoint().then(async () => transaction - .handler(step.definition.action + "", type, payload) + .handler(step.definition.action + "", type, payload, transaction) .catch(async (error) => { await TransactionOrchestrator.setStepFailure( transaction, diff --git a/packages/orchestration/src/transaction/transaction-step.ts b/packages/orchestration/src/transaction/transaction-step.ts index 6fe1d77975..72f63e9d25 100644 --- a/packages/orchestration/src/transaction/transaction-step.ts +++ b/packages/orchestration/src/transaction/transaction-step.ts @@ -1,15 +1,19 @@ -import { TransactionPayload } from "./distributed-transaction" import { - TransactionStepsDefinition, - TransactionStepStatus, - TransactionState, + DistributedTransaction, + TransactionPayload, +} from "./distributed-transaction" +import { TransactionHandlerType, + TransactionState, + TransactionStepStatus, + TransactionStepsDefinition, } from "./types" export type TransactionStepHandler = ( actionId: string, handlerType: TransactionHandlerType, - payload: TransactionPayload + payload: TransactionPayload, + transaction?: DistributedTransaction ) => Promise /** diff --git a/packages/orchestration/src/workflow/workflow-manager.ts b/packages/orchestration/src/workflow/workflow-manager.ts index 502b15c672..7cf49251e8 100644 --- a/packages/orchestration/src/workflow/workflow-manager.ts +++ b/packages/orchestration/src/workflow/workflow-manager.ts @@ -1,5 +1,6 @@ import { Context, MedusaContainer } from "@medusajs/types" import { + DistributedTransaction, OrchestratorBuilder, TransactionHandlerType, TransactionMetadata, @@ -35,6 +36,7 @@ export type WorkflowStepHandler = (args: { invoke: { [actions: string]: unknown } compensate: { [actions: string]: unknown } metadata: TransactionMetadata + transaction: DistributedTransaction context?: Context }) => unknown @@ -136,7 +138,8 @@ export class WorkflowManager { return async ( actionId: string, handlerType: TransactionHandlerType, - payload?: any + payload?: any, + transaction?: DistributedTransaction ) => { const command = handlers.get(actionId) @@ -157,6 +160,7 @@ export class WorkflowManager { invoke, compensate, metadata, + transaction: transaction as DistributedTransaction, context, }) } diff --git a/packages/workflows/package.json b/packages/workflows/package.json index 239cc5756f..8f1b514282 100644 --- a/packages/workflows/package.json +++ b/packages/workflows/package.json @@ -36,6 +36,6 @@ "prepare": "cross-env NODE_ENV=production yarn run build", "build": "rimraf dist && tsc --build", "watch": "tsc --build --watch", - "test": "jest --passWithNoTests" + "test": "jest" } } diff --git a/packages/workflows/src/helper/__tests__/pipe.spec.ts b/packages/workflows/src/helper/__tests__/pipe.spec.ts index 8a7712d6c8..29ca51e34b 100644 --- a/packages/workflows/src/helper/__tests__/pipe.spec.ts +++ b/packages/workflows/src/helper/__tests__/pipe.spec.ts @@ -45,4 +45,44 @@ describe("Pipe", function () { expect(result).toBeDefined() expect(result).toEqual(output) }) + + it("should execute onComplete function if available but the output result shouldn't change", async function () { + const payload = { input: "input" } + const output = { test: "test" } + const invoke = { + input: payload, + } + + const onComplete = jest.fn(async ({ data }) => { + data.__changed = true + + return + }) + + const handler = jest.fn().mockImplementation(async () => output) + const input = { + inputAlias: "payload", + invoke: [ + { + from: "payload", + alias: "input", + }, + ], + onComplete, + } + + const result = await pipe(input, handler)({ invoke, payload } as any) + + expect(handler).toHaveBeenCalled() + expect(handler).toHaveBeenCalledWith( + expect.objectContaining({ + data: { + input: payload, + }, + }) + ) + + expect(onComplete).toHaveBeenCalled() + expect(result).toEqual(output) + }) }) diff --git a/packages/workflows/src/helper/__tests__/workflow-export.spec.ts b/packages/workflows/src/helper/__tests__/workflow-export.spec.ts new file mode 100644 index 0000000000..9a4222555c --- /dev/null +++ b/packages/workflows/src/helper/__tests__/workflow-export.spec.ts @@ -0,0 +1,64 @@ +import { exportWorkflow } from "../workflow-export" + +jest.mock("@medusajs/orchestration", () => { + return { + TransactionHandlerType: { + INVOKE: "invoke", + COMPENSATE: "compensate", + }, + TransactionState: { + FAILED: "failed", + REVERTED: "reverted", + }, + LocalWorkflow: jest.fn(() => { + return { + run: jest.fn(() => { + return { + getErrors: jest.fn(), + getState: jest.fn(() => "done"), + getContext: jest.fn(() => { + return { + invoke: { result_step: "invoke_test" }, + } + }), + } + }), + } + }), + } +}) + +describe("Export Workflow", function () { + it("should prepare the input data before initializing the transaction", async function () { + let transformedInput + const prepare = jest.fn().mockImplementation(async (data) => { + data.__transformed = true + transformedInput = data + + return data + }) + + const work = exportWorkflow("id" as any, "result_step", prepare) + + const wfHandler = work() + + const input = { + test: "payload", + } + + const { result } = await wfHandler.run({ + input, + }) + + expect(input).toEqual({ + test: "payload", + }) + + expect(transformedInput).toEqual({ + test: "payload", + __transformed: true, + }) + + expect(result).toEqual("invoke_test") + }) +}) diff --git a/packages/workflows/src/helper/pipe.ts b/packages/workflows/src/helper/pipe.ts index c7cf22f3b4..a55f221c27 100644 --- a/packages/workflows/src/helper/pipe.ts +++ b/packages/workflows/src/helper/pipe.ts @@ -1,9 +1,10 @@ -import { Context, MedusaContainer, SharedContext } from "@medusajs/types" import { TransactionMetadata, WorkflowStepHandler, } from "@medusajs/orchestration" +import { Context, MedusaContainer, SharedContext } from "@medusajs/types" +import { DistributedTransaction } from "@medusajs/orchestration" import { InputAlias } from "../definitions" export type WorkflowStepMiddlewareReturn = { @@ -20,6 +21,7 @@ interface PipelineInput { inputAlias?: InputAlias | string invoke?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[] compensate?: WorkflowStepMiddlewareInput | WorkflowStepMiddlewareInput[] + onComplete?: (args: WorkflowOnCompleteArguments) => {} } export type WorkflowArguments = { @@ -30,6 +32,15 @@ export type WorkflowArguments = { context: Context | SharedContext } +export type WorkflowOnCompleteArguments = { + container: MedusaContainer + payload: unknown + data: T + metadata: TransactionMetadata + transaction: DistributedTransaction + context: Context | SharedContext +} + export type PipelineHandler = ( args: WorkflowArguments ) => Promise< @@ -48,6 +59,7 @@ export function pipe( invoke, compensate, metadata, + transaction, context, }) => { let data = {} @@ -61,8 +73,9 @@ export function pipe( Object.assign(original.invoke, { [input.inputAlias]: payload }) } - for (const key in input) { - if (!input[key] || key === "inputAlias") { + const dataKeys = ["invoke", "compensate"] + for (const key of dataKeys) { + if (!input[key]) { continue } @@ -111,6 +124,18 @@ export function pipe( finalResult = result } + if (typeof input.onComplete === "function") { + const dataCopy = JSON.parse(JSON.stringify(data)) + await input.onComplete({ + container, + payload, + data: dataCopy, + metadata, + transaction, + context: context as Context, + }) + } + return finalResult } } diff --git a/packages/workflows/src/helper/workflow-export.ts b/packages/workflows/src/helper/workflow-export.ts index cd95087cdb..3b2fc8dcec 100644 --- a/packages/workflows/src/helper/workflow-export.ts +++ b/packages/workflows/src/helper/workflow-export.ts @@ -27,7 +27,8 @@ export type WorkflowResult = { export const exportWorkflow = ( workflowId: Workflows, - defaultResult?: string + defaultResult?: string, + dataPreparation?: (data: TData) => Promise ) => { return function ( container?: LoadedModule[] | MedusaContainer @@ -60,6 +61,22 @@ export const exportWorkflow = ( resultFrom ??= defaultResult throwOnError ??= true + if (typeof dataPreparation === "function") { + try { + const copyInput = JSON.parse(JSON.stringify(input)) + input = await dataPreparation(copyInput as TData) + } catch (err) { + if (throwOnError) { + throw new Error( + `Data preparation failed: ${err.message}${EOL}${err.stack}` + ) + } + return { + errors: [err], + } + } + } + const transaction = await originalRun( context?.transactionId ?? ulid(), input,