feature: add tracing to remote query (#9128)
This commit is contained in:
@@ -20,6 +20,38 @@ import {
|
||||
export class Query {
|
||||
#remoteQuery: RemoteQuery
|
||||
|
||||
/**
|
||||
* Method to wrap execution of the graph query for instrumentation
|
||||
*/
|
||||
static traceGraphQuery?: (
|
||||
queryFn: () => Promise<any>,
|
||||
queryOptions: RemoteQueryObjectConfig<any>
|
||||
) => Promise<any>
|
||||
|
||||
/**
|
||||
* Method to wrap execution of the remoteQuery overload function
|
||||
* for instrumentation
|
||||
*/
|
||||
static traceRemoteQuery?: (
|
||||
queryFn: () => Promise<any>,
|
||||
queryOptions:
|
||||
| RemoteQueryObjectConfig<any>
|
||||
| RemoteQueryObjectFromStringResult<any>
|
||||
| RemoteJoinerQuery
|
||||
) => Promise<any>
|
||||
|
||||
static instrument = {
|
||||
graphQuery(tracer: (typeof Query)["traceGraphQuery"]) {
|
||||
Query.traceGraphQuery = tracer
|
||||
},
|
||||
remoteQuery(tracer: (typeof Query)["traceRemoteQuery"]) {
|
||||
Query.traceRemoteQuery = tracer
|
||||
},
|
||||
remoteDataFetch(tracer: (typeof RemoteQuery)["traceFetchRemoteData"]) {
|
||||
RemoteQuery.traceFetchRemoteData = tracer
|
||||
},
|
||||
}
|
||||
|
||||
constructor(remoteQuery: RemoteQuery) {
|
||||
this.#remoteQuery = remoteQuery
|
||||
}
|
||||
@@ -76,6 +108,13 @@ export class Query {
|
||||
}
|
||||
|
||||
const config = this.#unwrapQueryConfig(queryOptions)
|
||||
if (Query.traceRemoteQuery) {
|
||||
return await Query.traceRemoteQuery(
|
||||
async () => this.#remoteQuery.query(config, undefined, options),
|
||||
queryOptions
|
||||
)
|
||||
}
|
||||
|
||||
return await this.#remoteQuery.query(config, undefined, options)
|
||||
}
|
||||
|
||||
@@ -98,11 +137,27 @@ export class Query {
|
||||
options?: RemoteJoinerOptions
|
||||
): Promise<GraphResultSet<TEntry>> {
|
||||
const normalizedQuery = remoteQueryObjectFromString(queryOptions).__value
|
||||
const response = await this.#remoteQuery.query(
|
||||
normalizedQuery,
|
||||
undefined,
|
||||
options
|
||||
)
|
||||
let response:
|
||||
| any[]
|
||||
| { rows: any[]; metadata: RemoteQueryFunctionReturnPagination }
|
||||
|
||||
/**
|
||||
* When traceGraphQuery method is defined, we will wrap the implementation
|
||||
* inside a callback and provide the method to the traceGraphQuery
|
||||
*/
|
||||
if (Query.traceGraphQuery) {
|
||||
response = await Query.traceGraphQuery(
|
||||
async () =>
|
||||
this.#remoteQuery.query(normalizedQuery, undefined, options),
|
||||
queryOptions
|
||||
)
|
||||
} else {
|
||||
response = await this.#remoteQuery.query(
|
||||
normalizedQuery,
|
||||
undefined,
|
||||
options
|
||||
)
|
||||
}
|
||||
|
||||
return this.#unwrapRemoteQueryResponse(response)
|
||||
}
|
||||
|
||||
@@ -22,6 +22,13 @@ export class RemoteQuery {
|
||||
private modulesMap: Map<string, LoadedModule> = new Map()
|
||||
private customRemoteFetchData?: RemoteFetchDataCallback
|
||||
|
||||
static traceFetchRemoteData?: (
|
||||
fetcher: () => Promise<any>,
|
||||
serviceName: string,
|
||||
method: string,
|
||||
options: { select?: string[]; relations: string[] }
|
||||
) => Promise<any>
|
||||
|
||||
constructor({
|
||||
modulesLoaded,
|
||||
customRemoteFetchData,
|
||||
@@ -227,7 +234,17 @@ export class RemoteQuery {
|
||||
options.take = null
|
||||
}
|
||||
|
||||
const result = await service[methodName](filters, options)
|
||||
let result: any
|
||||
if (RemoteQuery.traceFetchRemoteData) {
|
||||
result = await RemoteQuery.traceFetchRemoteData(
|
||||
async () => service[methodName](filters, options),
|
||||
serviceConfig.serviceName,
|
||||
methodName,
|
||||
options
|
||||
)
|
||||
} else {
|
||||
result = await service[methodName](filters, options)
|
||||
}
|
||||
|
||||
if (hasPagination) {
|
||||
const [data, count] = result
|
||||
|
||||
@@ -13,4 +13,4 @@ export * from "./telemetry"
|
||||
|
||||
export const MEDUSA_CLI_PATH = require.resolve("@medusajs/medusa-cli")
|
||||
|
||||
export { GraphQLSchema, gqlSchemaToTypes } from "@medusajs/modules-sdk"
|
||||
export { GraphQLSchema, gqlSchemaToTypes, Query } from "@medusajs/modules-sdk"
|
||||
|
||||
@@ -2,7 +2,7 @@ import { snakeCase } from "lodash"
|
||||
import { NodeSDK } from "@opentelemetry/sdk-node"
|
||||
import { Resource } from "@opentelemetry/resources"
|
||||
import { SpanStatusCode } from "@opentelemetry/api"
|
||||
import { RoutesLoader, Tracer } from "@medusajs/framework"
|
||||
import { RoutesLoader, Tracer, Query } from "@medusajs/framework"
|
||||
import {
|
||||
type SpanExporter,
|
||||
SimpleSpanProcessor,
|
||||
@@ -21,7 +21,7 @@ function shouldExcludeResource(resource: string) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Instrumenting the first touch point of the Http layer to report traces to
|
||||
* Instrumenting the first touch point of the HTTP layer to report traces to
|
||||
* OpenTelemetry
|
||||
*/
|
||||
export function instrumentHttpLayer() {
|
||||
@@ -43,6 +43,12 @@ export function instrumentHttpLayer() {
|
||||
try {
|
||||
await requestHandler()
|
||||
} finally {
|
||||
if (res.statusCode >= 500) {
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: `Failed with ${res.statusMessage}`,
|
||||
})
|
||||
}
|
||||
span.setAttributes({ "http.statusCode": res.statusCode })
|
||||
span.end()
|
||||
}
|
||||
@@ -71,6 +77,7 @@ export function instrumentHttpLayer() {
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error.message || "Failed",
|
||||
})
|
||||
throw error
|
||||
} finally {
|
||||
span.end()
|
||||
}
|
||||
@@ -117,6 +124,85 @@ export function instrumentHttpLayer() {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Instrumenting the queries made using the remote query
|
||||
*/
|
||||
export function instrumentRemoteQuery() {
|
||||
const QueryTracer = new Tracer("@medusajs/query", "2.0.0")
|
||||
|
||||
Query.instrument.graphQuery(async function (queryFn, queryOptions) {
|
||||
return await QueryTracer.trace(
|
||||
`query.graph: ${queryOptions.entryPoint}`,
|
||||
async (span) => {
|
||||
span.setAttributes({
|
||||
"query.fields": queryOptions.fields,
|
||||
})
|
||||
return await queryFn()
|
||||
.catch((error) => {
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error.message,
|
||||
})
|
||||
})
|
||||
.finally(() => span.end())
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
Query.instrument.remoteQuery(async function (queryFn, queryOptions) {
|
||||
const traceIdentifier =
|
||||
"entryPoint" in queryOptions
|
||||
? queryOptions.entryPoint
|
||||
: "service" in queryOptions
|
||||
? queryOptions.service
|
||||
: "__value" in queryOptions
|
||||
? Object.keys(queryOptions.__value)[0]
|
||||
: "unknown source"
|
||||
|
||||
return await QueryTracer.trace(
|
||||
`remoteQuery: ${traceIdentifier}`,
|
||||
async (span) => {
|
||||
span.setAttributes({
|
||||
"query.fields": "fields" in queryOptions ? queryOptions.fields : [],
|
||||
})
|
||||
return await queryFn()
|
||||
.catch((error) => {
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error.message,
|
||||
})
|
||||
})
|
||||
.finally(() => span.end())
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
Query.instrument.remoteDataFetch(async function (
|
||||
fetchFn,
|
||||
serviceName,
|
||||
method,
|
||||
options
|
||||
) {
|
||||
return await QueryTracer.trace(
|
||||
`${snakeCase(serviceName)}.${snakeCase(method)}`,
|
||||
async (span) => {
|
||||
span.setAttributes({
|
||||
"fetch.select": options.select,
|
||||
"fetch.relations": options.relations,
|
||||
})
|
||||
return await fetchFn()
|
||||
.catch((error) => {
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error.message,
|
||||
})
|
||||
})
|
||||
.finally(() => span.end())
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper function to configure the OpenTelemetry SDK with some defaults.
|
||||
* For better/more control, please configure the SDK manually.
|
||||
|
||||
Reference in New Issue
Block a user