feat: add tracing to workflow steps (#9140)

This commit is contained in:
Harminder Virk
2024-09-17 11:35:38 +05:30
committed by GitHub
parent aa0928d437
commit 653a4bff71
4 changed files with 178 additions and 37 deletions

View File

@@ -52,6 +52,35 @@ export class TransactionOrchestrator extends EventEmitter {
return this.workflowOptions[modelId]
}
/**
* Trace workflow transaction for instrumentation
*/
static traceTransaction?: (
transactionResume: (...args: any[]) => Promise<void>,
metadata: {
model_id: string
transaction_id: string
flow_metadata: TransactionFlow["metadata"]
}
) => Promise<any>
/**
* Trace workflow steps for instrumentation
*/
static traceStep?: (
handler: (...args: any[]) => Promise<any>,
metadata: {
action: string
type: "invoke" | "compensate"
step_id: string
step_uuid: string
attempts: number
failures: number
async: boolean
idempotency_key: string
}
) => Promise<any>
constructor({
id,
definition,
@@ -730,18 +759,44 @@ export class TransactionOrchestrator extends EventEmitter {
}
}
const traceData = {
action: step.definition.action + "",
type,
step_id: step.id,
step_uuid: step.uuid + "",
attempts: step.attempts,
failures: step.failures,
async: !!(type === "invoke"
? step.definition.async
: step.definition.compensateAsync),
idempotency_key: payload.metadata.idempotency_key,
}
const handlerArgs = [
step.definition.action + "",
type,
payload,
transaction,
step,
this,
] as Parameters<TransactionStepHandler>
if (!isAsync) {
hasSyncSteps = true
const stepHandler = async () => {
return await transaction.handler(...handlerArgs)
}
let promise: Promise<unknown>
if (TransactionOrchestrator.traceStep) {
promise = TransactionOrchestrator.traceStep(stepHandler, traceData)
} else {
promise = stepHandler()
}
execution.push(
transaction
.handler(
step.definition.action + "",
type,
payload,
transaction,
step,
this
)
promise
.then(async (response: any) => {
if (this.hasExpired({ transaction, step }, Date.now())) {
await this.checkStepTimeout(transaction, step)
@@ -783,17 +838,26 @@ export class TransactionOrchestrator extends EventEmitter {
})
)
} else {
const stepHandler = async () => {
return await transaction.handler(...handlerArgs)
}
execution.push(
transaction.saveCheckpoint().then(() => {
transaction
.handler(
step.definition.action + "",
type,
payload,
transaction,
step,
this
let promise: Promise<unknown>
if (TransactionOrchestrator.traceStep) {
promise = TransactionOrchestrator.traceStep(
stepHandler,
traceData
)
} else {
promise = stepHandler()
}
// TODO discussion why do we not await here, adding an await I wouldnt expect the test to fail but it does, maybe we should split the test to also test after everything is executed?
// cc test from engine redis
promise
.then(async (response: any) => {
if (
!step.definition.backgroundExecution ||
@@ -877,28 +941,46 @@ export class TransactionOrchestrator extends EventEmitter {
return
}
const flow = transaction.getFlow()
const executeNext = async () => {
const flow = transaction.getFlow()
if (flow.state === TransactionState.NOT_STARTED) {
flow.state = TransactionState.INVOKING
flow.startedAt = Date.now()
if (flow.state === TransactionState.NOT_STARTED) {
flow.state = TransactionState.INVOKING
flow.startedAt = Date.now()
if (this.getOptions().store) {
await transaction.saveCheckpoint(
flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL
)
if (this.getOptions().store) {
await transaction.saveCheckpoint(
flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL
)
}
if (transaction.hasTimeout()) {
await transaction.scheduleTransactionTimeout(
transaction.getTimeout()!
)
}
this.emit(DistributedTransactionEvent.BEGIN, { transaction })
} else {
this.emit(DistributedTransactionEvent.RESUME, { transaction })
}
if (transaction.hasTimeout()) {
await transaction.scheduleTransactionTimeout(transaction.getTimeout()!)
}
this.emit(DistributedTransactionEvent.BEGIN, { transaction })
} else {
this.emit(DistributedTransactionEvent.RESUME, { transaction })
return await this.executeNext(transaction)
}
await this.executeNext(transaction)
if (
TransactionOrchestrator.traceTransaction &&
!transaction.getFlow().hasAsyncSteps
) {
await TransactionOrchestrator.traceTransaction(executeNext, {
model_id: transaction.modelId,
transaction_id: transaction.transactionId,
flow_metadata: transaction.getFlow().metadata,
})
return
}
await executeNext()
}
/**

View File

@@ -4,7 +4,7 @@ import {
WorkflowStepHandler,
WorkflowStepHandlerArguments,
} from "@medusajs/orchestration"
import { OrchestrationUtils, isString } from "@medusajs/utils"
import { isString, OrchestrationUtils } from "@medusajs/utils"
import { ulid } from "ulid"
import { StepResponse, resolveValue } from "./helpers"
import { createStepHandler } from "./helpers/create-step-handler"

View File

@@ -11,6 +11,7 @@ import {
import { snakeCase } from "lodash"
import start from "../commands/start"
import { TransactionOrchestrator } from "@medusajs/orchestration"
const EXCLUDED_RESOURCES = [".vite", "virtual:"]
@@ -21,7 +22,7 @@ function shouldExcludeResource(resource: string) {
}
/**
* Instrumenting the first touch point of the HTTP layer to report traces to
* Instrument the first touch point of the HTTP layer to report traces to
* OpenTelemetry
*/
export function instrumentHttpLayer() {
@@ -125,7 +126,7 @@ export function instrumentHttpLayer() {
}
/**
* Instrumenting the queries made using the remote query
* Instrument the queries made using the remote query
*/
export function instrumentRemoteQuery() {
const QueryTracer = new Tracer("@medusajs/query", "2.0.0")
@@ -203,6 +204,46 @@ export function instrumentRemoteQuery() {
})
}
/**
* Instrument the workflows and steps execution
*/
export function instrumentWorkflows() {
const WorkflowsTracer = new Tracer("@medusajs/workflows-sdk", "2.0.0")
TransactionOrchestrator.traceTransaction = async (
transactionResumeFn,
metadata
) => {
return await WorkflowsTracer.trace(
`workflow:${snakeCase(metadata.model_id)}`,
async function (span) {
span.setAttribute("workflow.transaction_id", metadata.transaction_id)
if (metadata.flow_metadata) {
Object.entries(metadata.flow_metadata).forEach(([key, value]) => {
span.setAttribute(`workflow.flow_metadata.${key}`, value as string)
})
}
return await transactionResumeFn().finally(() => span.end())
}
)
}
TransactionOrchestrator.traceStep = async (stepHandler, metadata) => {
return await WorkflowsTracer.trace(
`step:${snakeCase(metadata.action)}:${metadata.type}`,
async function (span) {
Object.entries(metadata).forEach(([key, value]) => {
span.setAttribute(`workflow.step.${key}`, value)
})
return await stepHandler().finally(() => span.end())
}
)
}
}
/**
* A helper function to configure the OpenTelemetry SDK with some defaults.
* For better/more control, please configure the SDK manually.
@@ -219,6 +260,11 @@ export function instrumentRemoteQuery() {
export function registerOtel(options: {
serviceName: string
exporter: SpanExporter
instrument?: Partial<{
http: boolean
remoteQuery: boolean
workflows: boolean
}>
instrumentations?: Instrumentation[]
}) {
const sdk = new NodeSDK({
@@ -232,6 +278,18 @@ export function registerOtel(options: {
...(options.instrumentations || []),
],
})
const instrument = options.instrument || {}
if (instrument.http) {
instrumentHttpLayer()
}
if (instrument.remoteQuery) {
instrumentRemoteQuery()
}
if (instrument.workflows) {
instrumentWorkflows()
}
sdk.start()
return sdk
}

View File

@@ -1,4 +1,3 @@
import { createDefaultsWorkflow } from "@medusajs/core-flows"
import { ConfigModule, MedusaContainer, PluginDetails } from "@medusajs/types"
import { ContainerRegistrationKeys, promiseAll } from "@medusajs/utils"
import { asValue } from "awilix"
@@ -164,6 +163,8 @@ export default async ({
expressApp,
rootDirectory
)
const { createDefaultsWorkflow } = await import("@medusajs/core-flows")
await createDefaultsWorkflow(container).run()
await onApplicationStart()