feat: workflows release events (#7664)

* feat: Release grouped events once a worfklow finish

* update common step

* update types

* tests

* optionality

* fies

* cancel flow on release failure

* clear events on fail

* more tests

* log errors

* log more errors

* update missing interface method declaration

* fix missing return

* update mock

* fix tests
This commit is contained in:
Adrien de Peretti
2024-06-11 15:48:15 +02:00
committed by GitHub
parent bef5941714
commit dd0b9f0805
6 changed files with 216 additions and 16 deletions

View File

@@ -4,17 +4,16 @@ import { createStep } from "@medusajs/workflows-sdk"
export const releaseEventsStepId = "release-events-step"
export const releaseEventsStep = createStep(
releaseEventsStepId,
async (
input: void,
{
container,
metadata: {
/* eventGroupId */
},
async (input: void, { container, eventGroupId }) => {
const eventBusService = container.resolve(
ModuleRegistrationName.EVENT_BUS,
{ allowUnregistered: true }
)
if (!eventBusService || !eventGroupId) {
return
}
) => {
const eventBus = container.resolve(ModuleRegistrationName.EVENT_BUS)
// await eventBus.release
await eventBusService.releaseGroupedEvents(eventGroupId)
},
async (data: void) => {}
)

View File

@@ -22,4 +22,12 @@ export default class EventBusService implements IEventBusModuleService {
): this {
return this
}
releaseGroupedEvents(eventGroupId: string): Promise<void> {
return Promise.resolve()
}
clearGroupedEvents(eventGroupId: string): Promise<void> {
return Promise.resolve()
}
}

View File

@@ -39,7 +39,7 @@ export class LocalWorkflow {
protected handlers: Map<string, StepHandler>
protected medusaContext?: Context
get container() {
get container(): MedusaContainer {
return this.container_
}

View File

@@ -17,4 +17,7 @@ export interface IEventBusModuleService {
subscriber: Subscriber,
context?: SubscriberContext
): this
releaseGroupedEvents(eventGroupId: string): Promise<void>
clearGroupedEvents(eventGroupId: string): Promise<void>
}

View File

@@ -7,7 +7,11 @@ import {
TransactionState,
} from "@medusajs/orchestration"
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import { isPresent, MedusaContextType } from "@medusajs/utils"
import {
isPresent,
MedusaContextType,
ModuleRegistrationName,
} from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
import { MedusaWorkflow } from "../medusa-workflow"
@@ -21,6 +25,7 @@ import {
MainExportedWorkflow,
WorkflowResult,
} from "./type"
import { ContainerRegistrationKeys } from "@medusajs/utils/dist"
function createContextualWorkflowRunner<
TData = unknown,
@@ -80,6 +85,9 @@ function createContextualWorkflowRunner<
}
const { eventGroupId } = context
attachOnFinishReleaseEvents(events, eventGroupId!, flow)
const flowMetadata = {
eventGroupId,
}
@@ -466,3 +474,58 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
MedusaWorkflow.registerWorkflow(workflowId, exportedWorkflow)
return exportedWorkflow as MainExportedWorkflow<TData, TResult>
}
function attachOnFinishReleaseEvents(
events: DistributedTransactionEvents = {},
eventGroupId: string,
flow: LocalWorkflow
) {
const onFinish = events.onFinish
const wrappedOnFinish = async (args: {
transaction: DistributedTransaction
result?: unknown
errors?: unknown[]
}) => {
await onFinish?.(args)
const eventBusService = (flow.container as MedusaContainer).resolve(
ModuleRegistrationName.EVENT_BUS,
{ allowUnregistered: true }
)
if (!eventBusService || !eventGroupId) {
return
}
const logger =
(flow.container as MedusaContainer).resolve(
ContainerRegistrationKeys.LOGGER,
{ allowUnregistered: true }
) || console
const { transaction } = args
const failedStatus = [TransactionState.FAILED, TransactionState.REVERTED]
if (failedStatus.includes(transaction.getState())) {
return await eventBusService
.clearGroupedEvents(eventGroupId)
.catch(() => {
logger.warn(
`Failed to clear events for eventGroupId - ${eventGroupId}`
)
})
}
await eventBusService.releaseGroupedEvents(eventGroupId).catch((e) => {
logger.error(
`Failed to release grouped events for eventGroupId: ${eventGroupId}`,
e
)
return flow.cancel(transaction)
})
}
events.onFinish = wrappedOnFinish
}

View File

@@ -1,17 +1,23 @@
import { WorkflowManager, WorkflowScheduler } from "@medusajs/orchestration"
import { promiseAll } from "@medusajs/utils"
import {
composeMessage,
createMedusaContainer,
ModuleRegistrationName,
promiseAll,
} from "@medusajs/utils"
import {
createStep,
createWorkflow,
hook,
MedusaWorkflow,
parallelize,
StepResponse,
transform,
} from "../.."
} from ".."
import { MedusaWorkflow } from "../../../medusa-workflow"
import { asValue } from "awilix"
import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist"
jest.setTimeout(30000)
import { IDistributedSchedulerStorage, SchedulerOptions } from "../../dist"
class MockSchedulerStorage implements IDistributedSchedulerStorage {
async schedule(
@@ -2086,4 +2092,125 @@ describe("Workflow composer", function () {
return [{ step1_nested_obj: obj.nested }, s2]
})
})
it("should emit grouped events once the workflow is executed and finished", async () => {
const container = createMedusaContainer()
container.register({
[ModuleRegistrationName.EVENT_BUS]: asValue({
releaseGroupedEvents: jest
.fn()
.mockImplementation(() => Promise.resolve()),
emit: jest.fn(),
}),
})
const mockStep1Fn = jest
.fn()
.mockImplementation(
async (input, { context: stepContext, container }) => {
const eventBusService = container.resolve(
ModuleRegistrationName.EVENT_BUS
)
await eventBusService.emit(
"event1",
composeMessage("event1", {
data: { eventGroupId: stepContext.eventGroupId },
context: stepContext,
object: "object",
source: "service",
action: "action",
})
)
}
)
const step1 = createStep("step1", mockStep1Fn)
const workflow = createWorkflow("workflow1", function (input) {
step1(input)
})
await workflow(container).run({
context: {
eventGroupId: "event-group-id",
},
})
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
expect(mockStep1Fn.mock.calls[0]).toHaveLength(2)
const eventBusMock = container.resolve(ModuleRegistrationName.EVENT_BUS)
expect(eventBusMock.emit).toHaveBeenCalledTimes(1)
expect(eventBusMock.emit.mock.calls[0][0]).toEqual("event1")
expect(eventBusMock.releaseGroupedEvents).toHaveBeenCalledTimes(1)
expect(eventBusMock.releaseGroupedEvents.mock.calls[0][0]).toEqual(
"event-group-id"
)
})
it("should clear grouped events on fail state", async () => {
const container = createMedusaContainer()
container.register({
[ModuleRegistrationName.EVENT_BUS]: asValue({
releaseGroupedEvents: jest
.fn()
.mockImplementation(() => Promise.resolve()),
clearGroupedEvents: jest
.fn()
.mockImplementation(() => Promise.resolve()),
emit: jest.fn(),
}),
})
const mockStep1Fn = jest
.fn()
.mockImplementation(
async (input, { context: stepContext, container }) => {
const eventBusService = container.resolve(
ModuleRegistrationName.EVENT_BUS
)
await eventBusService.emit(
"event1",
composeMessage("event1", {
data: { eventGroupId: stepContext.eventGroupId },
context: stepContext,
object: "object",
source: "service",
action: "action",
})
)
}
)
const mockStep2Fn = jest.fn().mockImplementation(() => {
throw new Error("invoke fail")
})
const step1 = createStep("step1", mockStep1Fn)
const step2 = createStep("step2", mockStep2Fn)
const workflow = createWorkflow("workflow1", function (input) {
step1(input)
step2()
})
await workflow(container).run({
context: {
eventGroupId: "event-group-id",
},
throwOnError: false,
})
const eventBusMock = container.resolve(ModuleRegistrationName.EVENT_BUS)
expect(eventBusMock.emit).toHaveBeenCalledTimes(1)
expect(eventBusMock.releaseGroupedEvents).toHaveBeenCalledTimes(0)
expect(eventBusMock.clearGroupedEvents).toHaveBeenCalledTimes(1)
expect(eventBusMock.clearGroupedEvents).toHaveBeenCalledWith(
"event-group-id"
)
})
})