feat(workflows-sdk): log on error (#8666)
This commit is contained in:
committed by
GitHub
parent
1be9373290
commit
eb0bfe9f33
@@ -11,6 +11,7 @@ type FlowRunOptions<TData = unknown> = {
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
logOnError?: boolean
|
||||
events?: Record<string, Function>
|
||||
}
|
||||
|
||||
|
||||
@@ -1,47 +1,39 @@
|
||||
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
import {
|
||||
DistributedTransactionType,
|
||||
DistributedTransactionEvents,
|
||||
DistributedTransactionType,
|
||||
LocalWorkflow,
|
||||
TransactionStepError,
|
||||
} from "@medusajs/orchestration"
|
||||
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
|
||||
|
||||
export type FlowRunOptions<TData = unknown> = {
|
||||
type BaseFlowRunOptions = {
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
logOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
|
||||
export type FlowRunOptions<TData = unknown> = BaseFlowRunOptions & {
|
||||
input?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
|
||||
export type FlowRegisterStepSuccessOptions<TData = unknown> = {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
export type FlowRegisterStepSuccessOptions<TData = unknown> =
|
||||
BaseFlowRunOptions & {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
}
|
||||
|
||||
export type FlowRegisterStepFailureOptions<TData = unknown> = {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
context?: Context
|
||||
resultFrom?: string | string[] | Symbol
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
export type FlowRegisterStepFailureOptions<TData = unknown> =
|
||||
BaseFlowRunOptions & {
|
||||
idempotencyKey: string
|
||||
response?: TData
|
||||
}
|
||||
|
||||
export type FlowCancelOptions = {
|
||||
export type FlowCancelOptions = BaseFlowRunOptions & {
|
||||
transaction?: DistributedTransactionType
|
||||
transactionId?: string
|
||||
context?: Context
|
||||
throwOnError?: boolean
|
||||
events?: DistributedTransactionEvents
|
||||
container?: LoadedModule[] | MedusaContainer
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { MedusaModule } from "@medusajs/modules-sdk"
|
||||
import {
|
||||
DistributedTransactionType,
|
||||
DistributedTransactionEvents,
|
||||
DistributedTransactionType,
|
||||
LocalWorkflow,
|
||||
TransactionHandlerType,
|
||||
TransactionState,
|
||||
@@ -62,6 +62,7 @@ function createContextualWorkflowRunner<
|
||||
method,
|
||||
{
|
||||
throwOnError,
|
||||
logOnError = false,
|
||||
resultFrom,
|
||||
isCancel = false,
|
||||
container: executionContainer,
|
||||
@@ -86,7 +87,7 @@ function createContextualWorkflowRunner<
|
||||
|
||||
const { eventGroupId } = context
|
||||
|
||||
attachOnFinishReleaseEvents(events, eventGroupId!, flow)
|
||||
attachOnFinishReleaseEvents(events, eventGroupId!, flow, { logOnError })
|
||||
|
||||
const flowMetadata = {
|
||||
eventGroupId,
|
||||
@@ -143,21 +144,18 @@ function createContextualWorkflowRunner<
|
||||
}
|
||||
}
|
||||
|
||||
const newRun = async (
|
||||
{
|
||||
input,
|
||||
context: outerContext,
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
}: FlowRunOptions = {
|
||||
throwOnError: true,
|
||||
resultFrom: defaultResult,
|
||||
}
|
||||
) => {
|
||||
const newRun = async ({
|
||||
input,
|
||||
context: outerContext,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
}: FlowRunOptions = {}) => {
|
||||
resultFrom ??= defaultResult
|
||||
throwOnError ??= true
|
||||
logOnError ??= false
|
||||
|
||||
const context = {
|
||||
...outerContext,
|
||||
@@ -185,7 +183,12 @@ function createContextualWorkflowRunner<
|
||||
|
||||
return await originalExecution(
|
||||
originalRun,
|
||||
{ throwOnError, resultFrom, container },
|
||||
{
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
container,
|
||||
logOnError,
|
||||
},
|
||||
context.transactionId,
|
||||
input,
|
||||
context,
|
||||
@@ -200,17 +203,18 @@ function createContextualWorkflowRunner<
|
||||
idempotencyKey,
|
||||
context: outerContext,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
}: FlowRegisterStepSuccessOptions = {
|
||||
idempotencyKey: "",
|
||||
throwOnError: true,
|
||||
resultFrom: defaultResult,
|
||||
}
|
||||
) => {
|
||||
idempotencyKey ??= ""
|
||||
resultFrom ??= defaultResult
|
||||
throwOnError ??= true
|
||||
logOnError ??= false
|
||||
|
||||
const [, transactionId] = idempotencyKey.split(":")
|
||||
const context = {
|
||||
@@ -223,7 +227,12 @@ function createContextualWorkflowRunner<
|
||||
|
||||
return await originalExecution(
|
||||
originalRegisterStepSuccess,
|
||||
{ throwOnError, resultFrom, container },
|
||||
{
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
container,
|
||||
logOnError,
|
||||
},
|
||||
idempotencyKey,
|
||||
response,
|
||||
context,
|
||||
@@ -238,17 +247,18 @@ function createContextualWorkflowRunner<
|
||||
idempotencyKey,
|
||||
context: outerContext,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
events,
|
||||
container,
|
||||
}: FlowRegisterStepFailureOptions = {
|
||||
idempotencyKey: "",
|
||||
throwOnError: true,
|
||||
resultFrom: defaultResult,
|
||||
}
|
||||
) => {
|
||||
idempotencyKey ??= ""
|
||||
resultFrom ??= defaultResult
|
||||
throwOnError ??= true
|
||||
logOnError ??= false
|
||||
|
||||
const [, transactionId] = idempotencyKey.split(":")
|
||||
const context = {
|
||||
@@ -261,7 +271,12 @@ function createContextualWorkflowRunner<
|
||||
|
||||
return await originalExecution(
|
||||
originalRegisterStepFailure,
|
||||
{ throwOnError, resultFrom, container },
|
||||
{
|
||||
throwOnError,
|
||||
resultFrom,
|
||||
container,
|
||||
logOnError,
|
||||
},
|
||||
idempotencyKey,
|
||||
response,
|
||||
context,
|
||||
@@ -270,19 +285,17 @@ function createContextualWorkflowRunner<
|
||||
}
|
||||
flow.registerStepFailure = newRegisterStepFailure as any
|
||||
|
||||
const newCancel = async (
|
||||
{
|
||||
transaction,
|
||||
transactionId,
|
||||
context: outerContext,
|
||||
throwOnError,
|
||||
events,
|
||||
container,
|
||||
}: FlowCancelOptions = {
|
||||
throwOnError: true,
|
||||
}
|
||||
) => {
|
||||
const newCancel = async ({
|
||||
transaction,
|
||||
transactionId,
|
||||
context: outerContext,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
events,
|
||||
container,
|
||||
}: FlowCancelOptions = {}) => {
|
||||
throwOnError ??= true
|
||||
logOnError ??= false
|
||||
|
||||
const context = {
|
||||
...outerContext,
|
||||
@@ -299,6 +312,7 @@ function createContextualWorkflowRunner<
|
||||
resultFrom: undefined,
|
||||
isCancel: true,
|
||||
container,
|
||||
logOnError,
|
||||
},
|
||||
transaction ?? transactionId!,
|
||||
undefined,
|
||||
@@ -478,7 +492,12 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
|
||||
function attachOnFinishReleaseEvents(
|
||||
events: DistributedTransactionEvents = {},
|
||||
eventGroupId: string,
|
||||
flow: LocalWorkflow
|
||||
flow: LocalWorkflow,
|
||||
{
|
||||
logOnError,
|
||||
}: {
|
||||
logOnError?: boolean
|
||||
} = {}
|
||||
) {
|
||||
const onFinish = events.onFinish
|
||||
|
||||
@@ -487,6 +506,30 @@ function attachOnFinishReleaseEvents(
|
||||
result?: unknown
|
||||
errors?: unknown[]
|
||||
}) => {
|
||||
const { transaction } = args
|
||||
|
||||
const logger =
|
||||
(flow.container as MedusaContainer).resolve(
|
||||
ContainerRegistrationKeys.LOGGER,
|
||||
{ allowUnregistered: true }
|
||||
) || console
|
||||
|
||||
if (logOnError) {
|
||||
const TERMINAL_SIZE = process.stdout?.columns ?? 60
|
||||
const separator = new Array(TERMINAL_SIZE).join("-")
|
||||
|
||||
const worflowName = transaction.getFlow().modelId
|
||||
const allWorkflowErrors = transaction
|
||||
.getErrors()
|
||||
.map(
|
||||
(err) =>
|
||||
`${worflowName}:${err?.action}:${err?.handlerType} - ${err?.error?.message}${EOL}${err?.error?.stack}`
|
||||
)
|
||||
.join(EOL + separator + EOL)
|
||||
|
||||
logger.error(allWorkflowErrors)
|
||||
}
|
||||
|
||||
await onFinish?.(args)
|
||||
|
||||
const eventBusService = (flow.container as MedusaContainer).resolve(
|
||||
@@ -498,13 +541,6 @@ function attachOnFinishReleaseEvents(
|
||||
return
|
||||
}
|
||||
|
||||
const logger =
|
||||
(flow.container as MedusaContainer).resolve(
|
||||
ContainerRegistrationKeys.LOGGER,
|
||||
{ allowUnregistered: true }
|
||||
) || console
|
||||
|
||||
const { transaction } = args
|
||||
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
|
||||
|
||||
if (failedStatus.includes(transaction.getState())) {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import {
|
||||
DistributedTransaction,
|
||||
DistributedTransactionType,
|
||||
DistributedTransactionEvents,
|
||||
DistributedTransactionType,
|
||||
TransactionHandlerType,
|
||||
TransactionStep,
|
||||
WorkflowScheduler,
|
||||
@@ -109,6 +109,7 @@ export class WorkflowOrchestratorService {
|
||||
transactionId,
|
||||
resultFrom,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
events: eventHandlers,
|
||||
container,
|
||||
} = options ?? {}
|
||||
@@ -148,6 +149,7 @@ export class WorkflowOrchestratorService {
|
||||
const ret = await flow.run({
|
||||
input,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
context,
|
||||
events,
|
||||
@@ -223,6 +225,7 @@ export class WorkflowOrchestratorService {
|
||||
const {
|
||||
context,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
container,
|
||||
events: eventHandlers,
|
||||
@@ -251,6 +254,7 @@ export class WorkflowOrchestratorService {
|
||||
context,
|
||||
resultFrom,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
events,
|
||||
response: stepResponse,
|
||||
})
|
||||
@@ -285,6 +289,7 @@ export class WorkflowOrchestratorService {
|
||||
const {
|
||||
context,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
container,
|
||||
events: eventHandlers,
|
||||
@@ -313,6 +318,7 @@ export class WorkflowOrchestratorService {
|
||||
context,
|
||||
resultFrom,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
events,
|
||||
response: stepResponse,
|
||||
})
|
||||
|
||||
@@ -11,20 +11,20 @@ import {
|
||||
} from "@medusajs/types"
|
||||
import {
|
||||
ContainerRegistrationKeys,
|
||||
createMedusaContainer,
|
||||
Module,
|
||||
Modules,
|
||||
TransactionHandlerType,
|
||||
TransactionStepState,
|
||||
createMedusaContainer,
|
||||
} from "@medusajs/utils"
|
||||
import { WorkflowsModuleService } from "@medusajs/workflow-engine-inmemory/dist/services"
|
||||
import { asFunction, asValue } from "awilix"
|
||||
import Redis from "ioredis"
|
||||
import { knex } from "knex"
|
||||
import { setTimeout } from "timers/promises"
|
||||
import "../__fixtures__"
|
||||
import { createScheduled } from "../__fixtures__/workflow_scheduled"
|
||||
import { DB_URL, TestDatabase } from "../utils"
|
||||
import { WorkflowsModuleService } from "@medusajs/workflow-engine-inmemory/dist/services"
|
||||
import Redis from "ioredis"
|
||||
|
||||
jest.setTimeout(100000)
|
||||
|
||||
@@ -216,6 +216,7 @@ describe("Workflow Orchestrator module", function () {
|
||||
myInput: "123",
|
||||
},
|
||||
throwOnError: false,
|
||||
logOnError: true,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import {
|
||||
DistributedTransaction,
|
||||
DistributedTransactionType,
|
||||
DistributedTransactionEvents,
|
||||
DistributedTransactionType,
|
||||
TransactionHandlerType,
|
||||
TransactionStep,
|
||||
WorkflowScheduler,
|
||||
@@ -12,12 +12,12 @@ import {
|
||||
Logger,
|
||||
MedusaContainer,
|
||||
} from "@medusajs/types"
|
||||
import { InjectSharedContext, isString, MedusaContext } from "@medusajs/utils"
|
||||
import { InjectSharedContext, MedusaContext, isString } from "@medusajs/utils"
|
||||
import {
|
||||
FlowRunOptions,
|
||||
MedusaWorkflow,
|
||||
resolveValue,
|
||||
ReturnWorkflow,
|
||||
resolveValue,
|
||||
} from "@medusajs/workflows-sdk"
|
||||
import Redis from "ioredis"
|
||||
import { ulid } from "ulid"
|
||||
@@ -158,6 +158,7 @@ export class WorkflowOrchestratorService {
|
||||
transactionId,
|
||||
resultFrom,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
events: eventHandlers,
|
||||
container,
|
||||
} = options ?? {}
|
||||
@@ -191,6 +192,7 @@ export class WorkflowOrchestratorService {
|
||||
const ret = await flow.run({
|
||||
input,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
context,
|
||||
events,
|
||||
@@ -266,6 +268,7 @@ export class WorkflowOrchestratorService {
|
||||
const {
|
||||
context,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
container,
|
||||
events: eventHandlers,
|
||||
@@ -294,6 +297,7 @@ export class WorkflowOrchestratorService {
|
||||
context,
|
||||
resultFrom,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
events,
|
||||
response: stepResponse,
|
||||
})
|
||||
@@ -328,6 +332,7 @@ export class WorkflowOrchestratorService {
|
||||
const {
|
||||
context,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
resultFrom,
|
||||
container,
|
||||
events: eventHandlers,
|
||||
@@ -356,6 +361,7 @@ export class WorkflowOrchestratorService {
|
||||
context,
|
||||
resultFrom,
|
||||
throwOnError,
|
||||
logOnError,
|
||||
events,
|
||||
response: stepResponse,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user