chore: ensure the event group id is kept for async workflow (#7668)

* chore: ensure the event group id is kept for async workflow

* revert test

* fix tests

---------

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
Co-authored-by: Riqwan Thamir <rmthamir@gmail.com>
This commit is contained in:
Adrien de Peretti
2024-06-11 13:20:17 +02:00
committed by GitHub
parent 3d72002c28
commit afae395fea
6 changed files with 157 additions and 18 deletions

View File

@@ -28,6 +28,10 @@ export type TransactionFlow = {
options?: TransactionModelOptions
definition: TransactionStepsDefinition
transactionId: string
metadata?: {
eventGroupId?: string
[key: string]: unknown
}
hasAsyncSteps: boolean
hasFailedSteps: boolean
hasWaitingSteps: boolean
@@ -839,7 +843,7 @@ export class TransactionOrchestrator extends EventEmitter {
await this.executeNext(transaction)
}
private createTransactionFlow(transactionId: string): TransactionFlow {
private createTransactionFlow(transactionId: string, flowMetadata?: TransactionFlow['metadata']): TransactionFlow {
const [steps, features] = TransactionOrchestrator.buildSteps(
this.definition
)
@@ -864,6 +868,7 @@ export class TransactionOrchestrator extends EventEmitter {
modelId: this.id,
options: this.options,
transactionId: transactionId,
metadata: flowMetadata,
hasAsyncSteps,
hasFailedSteps: false,
hasSkippedSteps: false,
@@ -1006,11 +1011,13 @@ export class TransactionOrchestrator extends EventEmitter {
* @param transactionId - unique identifier of the transaction
* @param handler - function to handle action of the transaction
* @param payload - payload to be passed to all the transaction steps
* @param flowMetadata - flow metadata which can include event group id for example
*/
public async beginTransaction(
transactionId: string,
handler: TransactionStepHandler,
payload?: unknown
payload?: unknown,
flowMetadata?: TransactionFlow["metadata"]
): Promise<DistributedTransaction> {
const existingTransaction =
await TransactionOrchestrator.loadTransactionById(this.id, transactionId)
@@ -1018,7 +1025,7 @@ export class TransactionOrchestrator extends EventEmitter {
let newTransaction = false
let modelFlow: TransactionFlow
if (!existingTransaction) {
modelFlow = this.createTransactionFlow(transactionId)
modelFlow = this.createTransactionFlow(transactionId, flowMetadata)
newTransaction = true
} else {
modelFlow = existingTransaction.flow

View File

@@ -13,6 +13,7 @@ import {
DistributedTransaction,
DistributedTransactionEvent,
DistributedTransactionEvents,
TransactionFlow,
TransactionModelOptions,
TransactionOrchestrator,
TransactionStepsDefinition,
@@ -332,7 +333,8 @@ export class LocalWorkflow {
uniqueTransactionId: string,
input?: unknown,
context?: Context,
subscribe?: DistributedTransactionEvents
subscribe?: DistributedTransactionEvents,
flowMetadata?: TransactionFlow["metadata"]
) {
if (this.flow.hasChanges) {
this.commit()
@@ -343,7 +345,8 @@ export class LocalWorkflow {
const transaction = await orchestrator.beginTransaction(
uniqueTransactionId,
handler(this.container_, context),
input
input,
flowMetadata
)
const { cleanUpEventListeners } = this.registerEventCallbacks({

View File

@@ -1,10 +1,12 @@
import { MedusaModule } from "@medusajs/modules-sdk"
import {
DistributedTransaction,
DistributedTransactionEvents,
LocalWorkflow,
TransactionHandlerType,
TransactionState,
} from "@medusajs/orchestration"
import { LoadedModule, MedusaContainer } from "@medusajs/types"
import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import { isPresent, MedusaContextType } from "@medusajs/utils"
import { EOL } from "os"
import { ulid } from "ulid"
@@ -59,7 +61,10 @@ function createContextualWorkflowRunner<
isCancel = false,
container: executionContainer,
},
...args
transactionOrIdOrIdempotencyKey: DistributedTransaction | string,
input: unknown,
context: Context,
events: DistributedTransactionEvents | undefined = {}
) => {
if (!executionContainer) {
const container_ = flow.container as MedusaContainer
@@ -74,6 +79,18 @@ function createContextualWorkflowRunner<
flow.container = executionContainer
}
const { eventGroupId } = context
const flowMetadata = {
eventGroupId,
}
const args = [
transactionOrIdOrIdempotencyKey,
input,
context,
events,
flowMetadata,
]
const transaction = await method.apply(method, args)
let errors = transaction.getErrors(TransactionHandlerType.INVOKE)
@@ -136,7 +153,7 @@ function createContextualWorkflowRunner<
const context = {
...outerContext,
__type: MedusaContextType,
__type: MedusaContextType as Context["__type"],
}
context.transactionId ??= ulid()
@@ -191,7 +208,7 @@ function createContextualWorkflowRunner<
const context = {
...outerContext,
transactionId,
__type: MedusaContextType,
__type: MedusaContextType as Context["__type"],
}
context.eventGroupId ??= ulid()
@@ -229,7 +246,7 @@ function createContextualWorkflowRunner<
const context = {
...outerContext,
transactionId,
__type: MedusaContextType,
__type: MedusaContextType as Context["__type"],
}
context.eventGroupId ??= ulid()
@@ -262,7 +279,7 @@ function createContextualWorkflowRunner<
const context = {
...outerContext,
transactionId,
__type: MedusaContextType,
__type: MedusaContextType as Context["__type"],
}
context.eventGroupId ??= ulid()
@@ -275,7 +292,8 @@ function createContextualWorkflowRunner<
isCancel: true,
container,
},
transaction ?? transactionId,
transaction ?? transactionId!,
undefined,
context,
events
)

View File

@@ -135,7 +135,9 @@ function applyStep<
attempt: metadata.attempt,
container: stepArguments.container,
metadata,
eventGroupId: stepArguments.context!.eventGroupId,
eventGroupId:
stepArguments.transaction.getFlow()?.metadata?.eventGroupId ??
stepArguments.context!.eventGroupId,
transactionId: stepArguments.context!.transactionId,
context: stepArguments.context!,
}

View File

@@ -0,0 +1,40 @@
import { createStep, createWorkflow } from "@medusajs/workflows-sdk"
import { setTimeout } from "timers/promises"
export const workflowEventGroupIdStep1Mock = jest.fn(async (input) => {
await setTimeout(200)
})
export const workflowEventGroupIdStep2Mock = jest.fn(async (input) => {
return
})
const step_1_background = createStep(
{
name: "step_1_event_group_id_background",
async: true,
},
workflowEventGroupIdStep1Mock
)
const step_2 = createStep(
{
name: "step_2_event_group_id",
async: true,
},
workflowEventGroupIdStep2Mock
)
export const eventGroupWorkflowId = "workflow_event_group_id"
createWorkflow(
{
name: eventGroupWorkflowId,
},
function (input) {
const resp = step_1_background(input)
step_2()
return resp
}
)

View File

@@ -1,14 +1,22 @@
import { MedusaApp } from "@medusajs/modules-sdk"
import { RemoteJoinerQuery } from "@medusajs/types"
import { WorkflowManager } from "@medusajs/orchestration"
import {
Context,
IWorkflowEngineService,
RemoteJoinerQuery,
} from "@medusajs/types"
import { TransactionHandlerType } from "@medusajs/utils"
import { IWorkflowEngineService, MedusaWorkflow } from "@medusajs/workflows-sdk"
import { knex } from "knex"
import { setTimeout as setTimeoutPromise } from "timers/promises"
import "../__fixtures__"
import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__"
import {
eventGroupWorkflowId,
workflowEventGroupIdStep1Mock,
workflowEventGroupIdStep2Mock,
} from "../__fixtures__/workflow_event_group_id"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import { DB_URL, TestDatabase } from "../utils"
import { WorkflowManager } from "@medusajs/orchestration"
const sharedPgConnection = knex<any, any>({
client: "pg",
@@ -59,7 +67,68 @@ describe("Workflow Orchestrator module", function () {
workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService
})
afterEach(afterEach_)
it("should execute an async workflow keeping track of the event group id provided in the context", async () => {
const eventGroupId = "event-group-id"
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
context: {
eventGroupId,
transactionId: "transaction_id",
},
throwOnError: true,
})
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id",
},
stepResponse: { hey: "oh" },
})
// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
})
it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => {
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
context: {
transactionId: "transaction_id_2",
},
throwOnError: true,
})
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id_2",
},
stepResponse: { hey: "oh" },
})
const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock
.calls[0][1] as unknown as Context)!.eventGroupId
// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
})
describe("Testing basic workflow", function () {
it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => {
await workflowOrcModule.run("workflow_1", {
@@ -174,7 +243,7 @@ describe("Workflow Orchestrator module", function () {
expect(transaction.flow.state).toEqual("reverted")
})
it("should subsctibe to a async workflow and receive the response when it finishes", (done) => {
it("should subscribe to a async workflow and receive the response when it finishes", (done) => {
const transactionId = "trx_123"
const onFinish = jest.fn(() => {