feat: Ensure async workflow executions have access to shared container (#8157)

* feat: Ensure async workflow executions have access to shared container

* fix: Register workflow worker on application start
This commit is contained in:
Stevche Radevski
2024-07-17 12:17:48 +02:00
committed by GitHub
parent 1acfdc4ffe
commit 26d600b6db
15 changed files with 140 additions and 38 deletions

View File

@@ -6,8 +6,10 @@ import {
} from "@medusajs/workflows-sdk"
export const createScheduled = (name: string, schedule?: SchedulerOptions) => {
const workflowScheduledStepInvoke = jest.fn((input, context) => {
return new StepResponse({})
const workflowScheduledStepInvoke = jest.fn((input, { container }) => {
return new StepResponse({
testValue: container.resolve("test-value"),
})
})
const step = createStep("step_1", workflowScheduledStepInvoke)

View File

@@ -17,7 +17,7 @@ import {
TransactionHandlerType,
TransactionStepState,
} from "@medusajs/utils"
import { asValue } from "awilix"
import { asFunction, asValue } from "awilix"
import { knex } from "knex"
import { setTimeout } from "timers/promises"
import "../__fixtures__"
@@ -54,6 +54,7 @@ describe("Workflow Orchestrator module", function () {
query: remoteQuery,
modules,
sharedContainer,
onApplicationStart,
} = await MedusaApp({
sharedContainer: container,
sharedResourcesConfig: {
@@ -73,6 +74,8 @@ describe("Workflow Orchestrator module", function () {
},
})
await onApplicationStart()
query = remoteQuery
sharedContainer_ = sharedContainer!
@@ -381,5 +384,21 @@ describe("Workflow Orchestrator module", function () {
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
)
})
it("the scheduled workflow should have access to the shared container", async () => {
sharedContainer_.register(
"test-value",
asFunction(() => "test")
)
const spy = await createScheduled("remove-scheduled", {
cron: "* * * * * *",
})
await setTimeout(1100)
expect(spy).toHaveBeenCalledTimes(1)
expect(spy).toHaveReturnedWith(
expect.objectContaining({ output: { testValue: "test" } })
)
})
})
})

View File

@@ -83,6 +83,7 @@ export class WorkflowOrchestratorService {
private instanceId = ulid()
protected redisPublisher: Redis
protected redisSubscriber: Redis
protected container_: MedusaContainer
private subscribers: Subscribers = new Map()
private activeStepsCount: number = 0
private logger: Logger
@@ -95,6 +96,7 @@ export class WorkflowOrchestratorService {
redisPublisher,
redisSubscriber,
logger,
sharedContainer,
}: {
dataLoaderOnly: boolean
redisDistributedTransactionStorage: RedisDistributedTransactionStorage
@@ -102,7 +104,9 @@ export class WorkflowOrchestratorService {
redisPublisher: Redis
redisSubscriber: Redis
logger: Logger
sharedContainer: MedusaContainer
}) {
this.container_ = sharedContainer
this.redisPublisher = redisPublisher
this.redisSubscriber = redisSubscriber
this.logger = logger
@@ -137,6 +141,10 @@ export class WorkflowOrchestratorService {
}
}
async onApplicationStart() {
await this.redisDistributedTransactionStorage_.onApplicationStart()
}
@InjectSharedContext()
async run<T = unknown>(
workflowIdOrWorkflow: string | ReturnWorkflow<any, any, any>,
@@ -175,7 +183,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)
const ret = await flow.run({
input,
@@ -230,7 +240,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)
const transaction = await flow.getRunningTransaction(transactionId, context)
@@ -266,7 +278,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)
const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,
@@ -326,7 +340,9 @@ export class WorkflowOrchestratorService {
throw new Error(`Workflow with id "${workflowId}" not found.`)
}
const flow = exportedWorkflow(container as MedusaContainer)
const flow = exportedWorkflow(
(container as MedusaContainer) ?? this.container_
)
const events = this.buildWorkflowEvents({
customEventHandlers: eventHandlers,

View File

@@ -60,6 +60,9 @@ export class WorkflowsModuleService<
onApplicationPrepareShutdown: async () => {
await this.workflowOrchestratorService_.onApplicationPrepareShutdown()
},
onApplicationStart: async () => {
await this.workflowOrchestratorService_.onApplicationStart()
},
}
@InjectSharedContext()

View File

@@ -29,6 +29,8 @@ export class RedisDistributedTransactionStorage
private workflowOrchestratorService_: WorkflowOrchestratorService
private redisClient: Redis
private redisWorkerConnection: Redis
private queueName: string
private queue: Queue
private worker: Worker
@@ -47,12 +49,24 @@ export class RedisDistributedTransactionStorage
}) {
this.workflowExecutionService_ = workflowExecutionService
this.logger_ = logger
this.redisClient = redisConnection
this.redisWorkerConnection = redisWorkerConnection
this.queueName = redisQueueName
this.queue = new Queue(redisQueueName, { connection: this.redisClient })
}
async onApplicationPrepareShutdown() {
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker.close()
}
async onApplicationShutdown() {
await this.queue.close()
}
async onApplicationStart() {
this.worker = new Worker(
redisQueueName,
this.queueName,
async (job) => {
const allJobs = [
JobType.RETRY,
@@ -75,19 +89,10 @@ export class RedisDistributedTransactionStorage
)
}
},
{ connection: redisWorkerConnection }
{ connection: this.redisWorkerConnection }
)
}
async onApplicationPrepareShutdown() {
// Close worker gracefully, i.e. wait for the current jobs to finish
await this.worker.close()
}
async onApplicationShutdown() {
await this.queue.close()
}
setWorkflowOrchestratorService(workflowOrchestratorService) {
this.workflowOrchestratorService_ = workflowOrchestratorService
}