From 653a4bff710be583156c040c3552ab0162042802 Mon Sep 17 00:00:00 2001 From: Harminder Virk Date: Tue, 17 Sep 2024 11:35:38 +0530 Subject: [PATCH] feat: add tracing to workflow steps (#9140) --- .../transaction/transaction-orchestrator.ts | 148 ++++++++++++++---- .../src/utils/composer/create-step.ts | 2 +- packages/medusa/src/instrumentation/index.ts | 62 +++++++- packages/medusa/src/loaders/index.ts | 3 +- 4 files changed, 178 insertions(+), 37 deletions(-) diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 82b26281be..2cf277c778 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -52,6 +52,35 @@ export class TransactionOrchestrator extends EventEmitter { return this.workflowOptions[modelId] } + /** + * Trace workflow transaction for instrumentation + */ + static traceTransaction?: ( + transactionResume: (...args: any[]) => Promise, + metadata: { + model_id: string + transaction_id: string + flow_metadata: TransactionFlow["metadata"] + } + ) => Promise + + /** + * Trace workflow steps for instrumentation + */ + static traceStep?: ( + handler: (...args: any[]) => Promise, + metadata: { + action: string + type: "invoke" | "compensate" + step_id: string + step_uuid: string + attempts: number + failures: number + async: boolean + idempotency_key: string + } + ) => Promise + constructor({ id, definition, @@ -730,18 +759,44 @@ export class TransactionOrchestrator extends EventEmitter { } } + const traceData = { + action: step.definition.action + "", + type, + step_id: step.id, + step_uuid: step.uuid + "", + attempts: step.attempts, + failures: step.failures, + async: !!(type === "invoke" + ? step.definition.async + : step.definition.compensateAsync), + idempotency_key: payload.metadata.idempotency_key, + } + + const handlerArgs = [ + step.definition.action + "", + type, + payload, + transaction, + step, + this, + ] as Parameters + if (!isAsync) { hasSyncSteps = true + + const stepHandler = async () => { + return await transaction.handler(...handlerArgs) + } + + let promise: Promise + if (TransactionOrchestrator.traceStep) { + promise = TransactionOrchestrator.traceStep(stepHandler, traceData) + } else { + promise = stepHandler() + } + execution.push( - transaction - .handler( - step.definition.action + "", - type, - payload, - transaction, - step, - this - ) + promise .then(async (response: any) => { if (this.hasExpired({ transaction, step }, Date.now())) { await this.checkStepTimeout(transaction, step) @@ -783,17 +838,26 @@ export class TransactionOrchestrator extends EventEmitter { }) ) } else { + const stepHandler = async () => { + return await transaction.handler(...handlerArgs) + } + execution.push( transaction.saveCheckpoint().then(() => { - transaction - .handler( - step.definition.action + "", - type, - payload, - transaction, - step, - this + let promise: Promise + + if (TransactionOrchestrator.traceStep) { + promise = TransactionOrchestrator.traceStep( + stepHandler, + traceData ) + } else { + promise = stepHandler() + } + + // TODO discussion why do we not await here, adding an await I wouldnt expect the test to fail but it does, maybe we should split the test to also test after everything is executed? + // cc test from engine redis + promise .then(async (response: any) => { if ( !step.definition.backgroundExecution || @@ -877,28 +941,46 @@ export class TransactionOrchestrator extends EventEmitter { return } - const flow = transaction.getFlow() + const executeNext = async () => { + const flow = transaction.getFlow() - if (flow.state === TransactionState.NOT_STARTED) { - flow.state = TransactionState.INVOKING - flow.startedAt = Date.now() + if (flow.state === TransactionState.NOT_STARTED) { + flow.state = TransactionState.INVOKING + flow.startedAt = Date.now() - if (this.getOptions().store) { - await transaction.saveCheckpoint( - flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL - ) + if (this.getOptions().store) { + await transaction.saveCheckpoint( + flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL + ) + } + + if (transaction.hasTimeout()) { + await transaction.scheduleTransactionTimeout( + transaction.getTimeout()! + ) + } + + this.emit(DistributedTransactionEvent.BEGIN, { transaction }) + } else { + this.emit(DistributedTransactionEvent.RESUME, { transaction }) } - if (transaction.hasTimeout()) { - await transaction.scheduleTransactionTimeout(transaction.getTimeout()!) - } - - this.emit(DistributedTransactionEvent.BEGIN, { transaction }) - } else { - this.emit(DistributedTransactionEvent.RESUME, { transaction }) + return await this.executeNext(transaction) } - await this.executeNext(transaction) + if ( + TransactionOrchestrator.traceTransaction && + !transaction.getFlow().hasAsyncSteps + ) { + await TransactionOrchestrator.traceTransaction(executeNext, { + model_id: transaction.modelId, + transaction_id: transaction.transactionId, + flow_metadata: transaction.getFlow().metadata, + }) + return + } + + await executeNext() } /** diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 2e6955b7c2..2aaf0d04da 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -4,7 +4,7 @@ import { WorkflowStepHandler, WorkflowStepHandlerArguments, } from "@medusajs/orchestration" -import { OrchestrationUtils, isString } from "@medusajs/utils" +import { isString, OrchestrationUtils } from "@medusajs/utils" import { ulid } from "ulid" import { StepResponse, resolveValue } from "./helpers" import { createStepHandler } from "./helpers/create-step-handler" diff --git a/packages/medusa/src/instrumentation/index.ts b/packages/medusa/src/instrumentation/index.ts index e32989f649..2207cb91e2 100644 --- a/packages/medusa/src/instrumentation/index.ts +++ b/packages/medusa/src/instrumentation/index.ts @@ -11,6 +11,7 @@ import { import { snakeCase } from "lodash" import start from "../commands/start" +import { TransactionOrchestrator } from "@medusajs/orchestration" const EXCLUDED_RESOURCES = [".vite", "virtual:"] @@ -21,7 +22,7 @@ function shouldExcludeResource(resource: string) { } /** - * Instrumenting the first touch point of the HTTP layer to report traces to + * Instrument the first touch point of the HTTP layer to report traces to * OpenTelemetry */ export function instrumentHttpLayer() { @@ -125,7 +126,7 @@ export function instrumentHttpLayer() { } /** - * Instrumenting the queries made using the remote query + * Instrument the queries made using the remote query */ export function instrumentRemoteQuery() { const QueryTracer = new Tracer("@medusajs/query", "2.0.0") @@ -203,6 +204,46 @@ export function instrumentRemoteQuery() { }) } +/** + * Instrument the workflows and steps execution + */ +export function instrumentWorkflows() { + const WorkflowsTracer = new Tracer("@medusajs/workflows-sdk", "2.0.0") + + TransactionOrchestrator.traceTransaction = async ( + transactionResumeFn, + metadata + ) => { + return await WorkflowsTracer.trace( + `workflow:${snakeCase(metadata.model_id)}`, + async function (span) { + span.setAttribute("workflow.transaction_id", metadata.transaction_id) + + if (metadata.flow_metadata) { + Object.entries(metadata.flow_metadata).forEach(([key, value]) => { + span.setAttribute(`workflow.flow_metadata.${key}`, value as string) + }) + } + + return await transactionResumeFn().finally(() => span.end()) + } + ) + } + + TransactionOrchestrator.traceStep = async (stepHandler, metadata) => { + return await WorkflowsTracer.trace( + `step:${snakeCase(metadata.action)}:${metadata.type}`, + async function (span) { + Object.entries(metadata).forEach(([key, value]) => { + span.setAttribute(`workflow.step.${key}`, value) + }) + + return await stepHandler().finally(() => span.end()) + } + ) + } +} + /** * A helper function to configure the OpenTelemetry SDK with some defaults. * For better/more control, please configure the SDK manually. @@ -219,6 +260,11 @@ export function instrumentRemoteQuery() { export function registerOtel(options: { serviceName: string exporter: SpanExporter + instrument?: Partial<{ + http: boolean + remoteQuery: boolean + workflows: boolean + }> instrumentations?: Instrumentation[] }) { const sdk = new NodeSDK({ @@ -232,6 +278,18 @@ export function registerOtel(options: { ...(options.instrumentations || []), ], }) + + const instrument = options.instrument || {} + if (instrument.http) { + instrumentHttpLayer() + } + if (instrument.remoteQuery) { + instrumentRemoteQuery() + } + if (instrument.workflows) { + instrumentWorkflows() + } + sdk.start() return sdk } diff --git a/packages/medusa/src/loaders/index.ts b/packages/medusa/src/loaders/index.ts index cf3c9b01b4..e02e068b9f 100644 --- a/packages/medusa/src/loaders/index.ts +++ b/packages/medusa/src/loaders/index.ts @@ -1,4 +1,3 @@ -import { createDefaultsWorkflow } from "@medusajs/core-flows" import { ConfigModule, MedusaContainer, PluginDetails } from "@medusajs/types" import { ContainerRegistrationKeys, promiseAll } from "@medusajs/utils" import { asValue } from "awilix" @@ -164,6 +163,8 @@ export default async ({ expressApp, rootDirectory ) + + const { createDefaultsWorkflow } = await import("@medusajs/core-flows") await createDefaultsWorkflow(container).run() await onApplicationStart()