Chore/rm main entity concept (#7709)

**What**
Update the `MedusaService` class, factory and types to remove the concept of main modules. The idea being that all method will be explicitly named and suffixes to represent the object you are trying to manipulate.
This pr also includes various fixes in different modules

Co-authored-by: Stevche Radevski <4820812+sradevski@users.noreply.github.com>
Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2024-06-19 15:02:16 +02:00
committed by GitHub
parent 2895ccfba8
commit 48963f55ef
533 changed files with 6469 additions and 9769 deletions

View File

@@ -1,12 +1,7 @@
import { MedusaApp } from "@medusajs/modules-sdk"
import { Modules, RemoteQueryFunction } from "@medusajs/modules-sdk"
import { WorkflowManager } from "@medusajs/orchestration"
import {
Context,
IWorkflowEngineService,
RemoteJoinerQuery,
} from "@medusajs/types"
import { Context, IWorkflowEngineService } from "@medusajs/types"
import { TransactionHandlerType } from "@medusajs/utils"
import { knex } from "knex"
import { setTimeout as setTimeoutPromise } from "timers/promises"
import "../__fixtures__"
import { workflow2Step2Invoke, workflow2Step3Invoke } from "../__fixtures__"
@@ -16,316 +11,284 @@ import {
workflowEventGroupIdStep2Mock,
} from "../__fixtures__/workflow_event_group_id"
import { createScheduled } from "../__fixtures__/workflow_scheduled"
import { DB_URL, TestDatabase } from "../utils"
const sharedPgConnection = knex<any, any>({
client: "pg",
searchPath: process.env.MEDUSA_WORKFLOW_ENGINE_DB_SCHEMA,
connection: {
connectionString: DB_URL,
debug: false,
},
})
const afterEach_ = async () => {
await TestDatabase.clearTables(sharedPgConnection)
}
import { moduleIntegrationTestRunner } from "medusa-test-utils"
jest.setTimeout(100000)
describe("Workflow Orchestrator module", function () {
let workflowOrcModule: IWorkflowEngineService
let query: (
query: string | RemoteJoinerQuery | object,
variables?: Record<string, unknown>
) => Promise<any>
moduleIntegrationTestRunner<IWorkflowEngineService>({
moduleName: Modules.WORKFLOW_ENGINE,
resolve: __dirname + "/../..",
testSuite: ({ service: workflowOrcModule, medusaApp }) => {
describe("Workflow Orchestrator module", function () {
let query: RemoteQueryFunction
afterEach(afterEach_)
beforeAll(async () => {
const {
runMigrations,
query: remoteQuery,
modules,
} = await MedusaApp({
sharedResourcesConfig: {
database: {
connection: sharedPgConnection,
},
},
modulesConfig: {
workflows: {
resolve: __dirname + "/../..",
},
},
})
query = remoteQuery
await runMigrations()
workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService
})
it("should execute an async workflow keeping track of the event group id provided in the context", async () => {
const eventGroupId = "event-group-id"
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
context: {
eventGroupId,
transactionId: "transaction_id",
},
throwOnError: true,
})
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id",
},
stepResponse: { hey: "oh" },
})
// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
})
it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => {
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
context: {
transactionId: "transaction_id_2",
},
throwOnError: true,
})
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id_2",
},
stepResponse: { hey: "oh" },
})
const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock
.calls[0][1] as unknown as Context)!.eventGroupId
// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
})
describe("Testing basic workflow", function () {
it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => {
await workflowOrcModule.run("workflow_1", {
input: {
value: "123",
},
throwOnError: true,
beforeEach(() => {
query = medusaApp.query
})
let executionsList = await query({
workflow_executions: {
fields: ["workflow_id", "transaction_id", "state"],
},
})
it("should execute an async workflow keeping track of the event group id provided in the context", async () => {
const eventGroupId = "event-group-id"
expect(executionsList).toHaveLength(1)
const { result } = await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "new_step_name",
workflowId: "workflow_1",
transactionId: executionsList[0].transaction_id,
},
stepResponse: { uhuuuu: "yeaah!" },
})
executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(0)
expect(result).toEqual({
done: {
inputFromSyncStep: "oh",
},
})
})
it("should return a list of workflow executions and keep it saved when there is a retentionTime set", async () => {
await workflowOrcModule.run("workflow_2", {
input: {
value: "123",
},
throwOnError: true,
transactionId: "transaction_1",
})
let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(1)
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "new_step_name",
workflowId: "workflow_2",
transactionId: "transaction_1",
},
stepResponse: { uhuuuu: "yeaah!" },
})
expect(workflow2Step2Invoke).toBeCalledTimes(2)
expect(workflow2Step2Invoke.mock.calls[0][0]).toEqual({ hey: "oh" })
expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({
hey: "async hello",
})
expect(workflow2Step3Invoke).toBeCalledTimes(1)
expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({
uhuuuu: "yeaah!",
})
executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(1)
})
it("should revert the entire transaction when a step timeout expires", async () => {
const { transaction } = await workflowOrcModule.run(
"workflow_step_timeout",
{
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
throwOnError: false,
}
)
context: {
eventGroupId,
transactionId: "transaction_id",
},
throwOnError: true,
})
expect(transaction.flow.state).toEqual("reverted")
})
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id",
},
stepResponse: { hey: "oh" },
})
it("should revert the entire transaction when the transaction timeout expires", async () => {
const { transaction } = await workflowOrcModule.run(
"workflow_transaction_timeout",
{
// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId })
)
})
it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => {
await workflowOrcModule.run(eventGroupWorkflowId, {
input: {},
throwOnError: false,
}
)
context: {
transactionId: "transaction_id_2",
},
throwOnError: true,
})
await setTimeoutPromise(200)
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "step_1_event_group_id_background",
workflowId: eventGroupWorkflowId,
transactionId: "transaction_id_2",
},
stepResponse: { hey: "oh" },
})
expect(transaction.flow.state).toEqual("reverted")
})
const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock
.calls[0][1] as unknown as Context)!.eventGroupId
it("should subscribe to a async workflow and receive the response when it finishes", (done) => {
const transactionId = "trx_123"
const onFinish = jest.fn(() => {
done()
// Validate context event group id
expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual(
expect.objectContaining({ eventGroupId: generatedEventGroupId })
)
})
void workflowOrcModule.subscribe({
workflowId: "workflow_async_background",
transactionId,
subscriber: (event) => {
if (event.eventType === "onFinish") {
onFinish()
}
},
describe("Testing basic workflow", function () {
it("should return a list of workflow executions and remove after completed when there is no retentionTime set", async () => {
await workflowOrcModule.run("workflow_1", {
input: {
value: "123",
},
throwOnError: true,
})
let executionsList = await query({
workflow_executions: {
fields: ["workflow_id", "transaction_id", "state"],
},
})
expect(executionsList).toHaveLength(1)
const { result } = await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "new_step_name",
workflowId: "workflow_1",
transactionId: executionsList[0].transaction_id,
},
stepResponse: { uhuuuu: "yeaah!" },
})
executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(0)
expect(result).toEqual({
done: {
inputFromSyncStep: "oh",
},
})
})
it("should return a list of workflow executions and keep it saved when there is a retentionTime set", async () => {
await workflowOrcModule.run("workflow_2", {
input: {
value: "123",
},
throwOnError: true,
transactionId: "transaction_1",
})
let executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(1)
await workflowOrcModule.setStepSuccess({
idempotencyKey: {
action: TransactionHandlerType.INVOKE,
stepId: "new_step_name",
workflowId: "workflow_2",
transactionId: "transaction_1",
},
stepResponse: { uhuuuu: "yeaah!" },
})
expect(workflow2Step2Invoke).toBeCalledTimes(2)
expect(workflow2Step2Invoke.mock.calls[0][0]).toEqual({ hey: "oh" })
expect(workflow2Step2Invoke.mock.calls[1][0]).toEqual({
hey: "async hello",
})
expect(workflow2Step3Invoke).toBeCalledTimes(1)
expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({
uhuuuu: "yeaah!",
})
executionsList = await query({
workflow_executions: {
fields: ["id"],
},
})
expect(executionsList).toHaveLength(1)
})
it("should revert the entire transaction when a step timeout expires", async () => {
const { transaction } = await workflowOrcModule.run(
"workflow_step_timeout",
{
input: {},
throwOnError: false,
}
)
expect(transaction.flow.state).toEqual("reverted")
})
it("should revert the entire transaction when the transaction timeout expires", async () => {
const { transaction } = await workflowOrcModule.run(
"workflow_transaction_timeout",
{
input: {},
throwOnError: false,
}
)
await setTimeoutPromise(200)
expect(transaction.flow.state).toEqual("reverted")
})
it("should subscribe to a async workflow and receive the response when it finishes", (done) => {
const transactionId = "trx_123"
const onFinish = jest.fn(() => {
done()
})
void workflowOrcModule.subscribe({
workflowId: "workflow_async_background",
transactionId,
subscriber: (event) => {
if (event.eventType === "onFinish") {
onFinish()
}
},
})
void workflowOrcModule.run("workflow_async_background", {
input: {
myInput: "123",
},
transactionId,
throwOnError: false,
})
expect(onFinish).toHaveBeenCalledTimes(0)
})
})
void workflowOrcModule.run("workflow_async_background", {
input: {
myInput: "123",
},
transactionId,
throwOnError: false,
describe("Scheduled workflows", () => {
beforeAll(() => {
jest.useFakeTimers()
jest.spyOn(global, "setTimeout")
})
afterAll(() => {
jest.useRealTimers()
})
it("should execute a scheduled workflow", async () => {
const spy = createScheduled("standard")
jest.clearAllMocks()
await jest.runOnlyPendingTimersAsync()
expect(setTimeout).toHaveBeenCalledTimes(2)
expect(spy).toHaveBeenCalledTimes(1)
await jest.runOnlyPendingTimersAsync()
expect(setTimeout).toHaveBeenCalledTimes(3)
expect(spy).toHaveBeenCalledTimes(2)
})
it("should stop executions after the set number of executions", async () => {
const spy = await createScheduled("num-executions", {
cron: "* * * * * *",
numberOfExecutions: 2,
})
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(2)
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(2)
})
it("should remove scheduled workflow if workflow no longer exists", async () => {
const spy = await createScheduled("remove-scheduled", {
cron: "* * * * * *",
})
const logSpy = jest.spyOn(console, "warn")
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
WorkflowManager["workflows"].delete("remove-scheduled")
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
expect(logSpy).toHaveBeenCalledWith(
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
)
})
})
expect(onFinish).toHaveBeenCalledTimes(0)
})
})
describe("Scheduled workflows", () => {
beforeAll(() => {
jest.useFakeTimers()
jest.spyOn(global, "setTimeout")
})
afterAll(() => {
jest.useRealTimers()
})
it("should execute a scheduled workflow", async () => {
const spy = createScheduled("standard")
await jest.runOnlyPendingTimersAsync()
expect(setTimeout).toHaveBeenCalledTimes(2)
expect(spy).toHaveBeenCalledTimes(1)
await jest.runOnlyPendingTimersAsync()
expect(setTimeout).toHaveBeenCalledTimes(3)
expect(spy).toHaveBeenCalledTimes(2)
})
it("should stop executions after the set number of executions", async () => {
const spy = await createScheduled("num-executions", {
cron: "* * * * * *",
numberOfExecutions: 2,
})
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(2)
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(2)
})
it("should remove scheduled workflow if workflow no longer exists", async () => {
const spy = await createScheduled("remove-scheduled", {
cron: "* * * * * *",
})
const logSpy = jest.spyOn(console, "warn")
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
WorkflowManager["workflows"].delete("remove-scheduled")
await jest.runOnlyPendingTimersAsync()
expect(spy).toHaveBeenCalledTimes(1)
expect(logSpy).toHaveBeenCalledWith(
"Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler."
)
})
})
},
})

View File

@@ -1,6 +0,0 @@
if (typeof process.env.DB_TEMP_NAME === "undefined") {
const tempName = parseInt(process.env.JEST_WORKER_ID || "1")
process.env.DB_TEMP_NAME = `medusa-workflow-engine-inmemory-${tempName}`
}
process.env.MEDUSA_WORKFLOW_ENGINE_DB_SCHEMA = "public"

View File

@@ -1,3 +0,0 @@
import { JestUtils } from "medusa-test-utils"
JestUtils.afterAllHookDropDatabase()

View File

@@ -1,22 +0,0 @@
import * as process from "process"
const DB_HOST = process.env.DB_HOST ?? "localhost"
const DB_USERNAME = process.env.DB_USERNAME ?? ""
const DB_PASSWORD = process.env.DB_PASSWORD
const DB_NAME = process.env.DB_TEMP_NAME
export const DB_URL = `postgres://${DB_USERNAME}${
DB_PASSWORD ? `:${DB_PASSWORD}` : ""
}@${DB_HOST}/${DB_NAME}`
interface TestDatabase {
clearTables(knex): Promise<void>
}
export const TestDatabase: TestDatabase = {
clearTables: async (knex) => {
await knex.raw(`
TRUNCATE TABLE workflow_execution CASCADE;
`)
},
}

View File

@@ -1 +0,0 @@
export * from "./database"

View File

@@ -17,6 +17,4 @@ module.exports = {
testEnvironment: `node`,
moduleFileExtensions: [`js`, `ts`],
modulePathIgnorePatterns: ["dist/"],
setupFiles: ["<rootDir>/integration-tests/setup-env.js"],
setupFilesAfterEnv: ["<rootDir>/integration-tests/setup.js"],
}

View File

@@ -8,7 +8,7 @@
"dist"
],
"engines": {
"node": ">=16"
"node": ">=20"
},
"repository": {
"type": "git",

View File

@@ -1,34 +1,11 @@
import { Modules } from "@medusajs/modules-sdk"
import { ModuleJoinerConfig } from "@medusajs/types"
import { MapToConfig } from "@medusajs/utils"
import { WorkflowExecution } from "@models"
import moduleSchema from "./schema"
import {
buildEntitiesNameToLinkableKeysMap,
defineJoinerConfig,
MapToConfig,
} from "@medusajs/utils"
export const LinkableKeys = {
workflow_execution_id: WorkflowExecution.name,
}
export const joinerConfig = defineJoinerConfig(Modules.WORKFLOW_ENGINE)
const entityLinkableKeysMap: MapToConfig = {}
Object.entries(LinkableKeys).forEach(([key, value]) => {
entityLinkableKeysMap[value] ??= []
entityLinkableKeysMap[value].push({
mapTo: key,
valueFrom: key.split("_").pop()!,
})
})
export const entityNameToLinkableKeysMap: MapToConfig = entityLinkableKeysMap
export const joinerConfig: ModuleJoinerConfig = {
serviceName: Modules.WORKFLOW_ENGINE,
primaryKeys: ["id"],
schema: moduleSchema,
linkableKeys: LinkableKeys,
alias: {
name: ["workflow_execution", "workflow_executions"],
args: {
entity: WorkflowExecution.name,
methodSuffix: "WorkflowExecution",
},
},
}
export const entityNameToLinkableKeysMap: MapToConfig =
buildEntitiesNameToLinkableKeysMap(joinerConfig.linkableKeys)

View File

@@ -1,2 +0,0 @@
export { MikroOrmBaseRepository as BaseRepository } from "@medusajs/utils"
export { WorkflowExecutionRepository } from "./workflow-execution"

View File

@@ -1,7 +0,0 @@
import { DALUtils } from "@medusajs/utils"
import { WorkflowExecution } from "@models"
// eslint-disable-next-line max-len
export class WorkflowExecutionRepository extends DALUtils.mikroOrmBaseRepositoryFactory(
WorkflowExecution
) {}

View File

@@ -2,8 +2,8 @@ import {
Context,
DAL,
FindConfig,
IWorkflowEngineService,
InternalModuleDeclaration,
IWorkflowEngineService,
ModuleJoinerConfig,
ModulesSdkTypes,
WorkflowsSdkTypes,
@@ -93,7 +93,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
}
@InjectManager("baseRepository_")
async listWorkflowExecution(
async listWorkflowExecutions(
filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {},
config: FindConfig<WorkflowsSdkTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}
@@ -128,7 +128,7 @@ export class WorkflowsModuleService implements IWorkflowEngineService {
}
@InjectManager("baseRepository_")
async listAndCountWorkflowExecution(
async listAndCountWorkflowExecutions(
filters: WorkflowsSdkTypes.FilterableWorkflowExecutionProps = {},
config: FindConfig<WorkflowsSdkTypes.WorkflowExecutionDTO> = {},
@MedusaContext() sharedContext: Context = {}