diff --git a/.changeset/sixty-years-invite.md b/.changeset/sixty-years-invite.md new file mode 100644 index 0000000000..b771bef6db --- /dev/null +++ b/.changeset/sixty-years-invite.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/utils": patch +--- + +fix(workflow-engine-*): Cleanup expired executions diff --git a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts index c16a0f07d3..af51bcfccf 100644 --- a/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts +++ b/packages/core/orchestration/src/transaction/datastore/abstract-storage.ts @@ -21,7 +21,6 @@ export interface IDistributedTransactionStorage { key: string, options?: TransactionOptions & { isCancelling?: boolean } ): Promise - list(): Promise save( key: string, data: TransactionCheckpoint, @@ -56,6 +55,7 @@ export interface IDistributedTransactionStorage { transaction: DistributedTransactionType, step: TransactionStep ): Promise + clearExpiredExecutions(): Promise } export abstract class DistributedSchedulerStorage @@ -92,10 +92,6 @@ export abstract class DistributedTransactionStorage throw new Error("Method 'get' not implemented.") } - async list(): Promise { - throw new Error("Method 'list' not implemented.") - } - async save( key: string, data: TransactionCheckpoint, @@ -149,4 +145,8 @@ export abstract class DistributedTransactionStorage ): Promise { throw new Error("Method 'clearStepTimeout' not implemented.") } + + async clearExpiredExecutions(): Promise { + throw new Error("Method 'clearExpiredExecutions' not implemented.") + } } diff --git a/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts b/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts index b6745873ac..cf3020f5a9 100644 --- a/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts +++ b/packages/core/orchestration/src/transaction/datastore/base-in-memory-storage.ts @@ -41,4 +41,6 @@ export class BaseInMemoryDistributedTransactionStorage extends DistributedTransa this.storage.set(key, data) } } + + async clearExpiredExecutions(): Promise {} } diff --git a/packages/core/utils/src/dml/__tests__/entity-builder.spec.ts b/packages/core/utils/src/dml/__tests__/entity-builder.spec.ts index 305e2cecbf..b2bc86b83f 100644 --- a/packages/core/utils/src/dml/__tests__/entity-builder.spec.ts +++ b/packages/core/utils/src/dml/__tests__/entity-builder.spec.ts @@ -23,7 +23,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", nullable: false, - onCreate: expect.any(Function), + kind: "scalar", setter: false, type: "date", @@ -45,8 +45,6 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", nullable: false, - onCreate: expect.any(Function), - onUpdate: expect.any(Function), kind: "scalar", setter: false, type: "date", @@ -148,7 +146,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -191,8 +189,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -292,7 +289,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -326,8 +323,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -427,7 +423,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -461,8 +457,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -575,7 +570,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -587,8 +582,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -721,7 +715,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -733,8 +727,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -868,7 +861,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -880,8 +873,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -981,7 +973,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -993,8 +985,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1103,7 +1094,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1115,8 +1106,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1227,7 +1217,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1239,8 +1229,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1363,7 +1352,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1375,8 +1364,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1459,7 +1447,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1482,8 +1470,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1563,7 +1550,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1575,8 +1562,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1670,7 +1656,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1682,8 +1668,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1783,7 +1768,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1795,8 +1780,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1896,7 +1880,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -1908,8 +1892,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2011,7 +1994,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2023,8 +2006,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2168,7 +2150,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2180,8 +2162,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2288,7 +2269,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2300,8 +2281,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2407,7 +2387,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2419,8 +2399,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2506,7 +2485,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2518,8 +2497,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2600,7 +2578,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2612,8 +2590,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2687,7 +2664,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2699,8 +2676,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2774,7 +2750,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2786,8 +2762,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2867,7 +2842,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2879,8 +2854,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2929,7 +2903,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -2941,8 +2915,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3029,7 +3002,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3041,8 +3014,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3114,7 +3086,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3126,8 +3098,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3229,7 +3200,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3241,8 +3212,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3342,7 +3312,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3354,8 +3324,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3446,7 +3415,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3458,8 +3427,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3554,7 +3522,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3566,8 +3534,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3616,7 +3583,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3628,8 +3595,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3732,7 +3698,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3744,8 +3710,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3817,7 +3782,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -3829,8 +3794,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4202,7 +4166,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4214,8 +4178,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4293,7 +4256,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4305,8 +4268,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4386,7 +4348,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4398,8 +4360,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4481,7 +4442,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4493,8 +4454,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4560,7 +4520,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4572,8 +4532,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4683,7 +4642,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4695,8 +4654,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4766,7 +4724,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4778,8 +4736,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4898,7 +4855,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4910,8 +4867,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4981,7 +4937,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -4993,8 +4949,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5092,7 +5047,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5104,8 +5059,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5170,7 +5124,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5182,8 +5136,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5281,7 +5234,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5293,8 +5246,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5359,7 +5311,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5371,8 +5323,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5517,7 +5468,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5529,8 +5480,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5601,7 +5551,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5613,8 +5563,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5718,7 +5667,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5730,8 +5679,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5802,7 +5750,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5814,8 +5762,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5916,7 +5863,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -5928,8 +5875,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6033,7 +5979,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6045,8 +5991,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6149,7 +6094,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6161,8 +6106,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6218,7 +6162,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6230,8 +6174,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6330,7 +6273,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6342,8 +6285,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6399,7 +6341,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6411,8 +6353,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6625,7 +6566,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6637,8 +6578,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6694,7 +6634,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6706,8 +6646,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6806,7 +6745,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6818,8 +6757,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6875,7 +6813,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -6887,8 +6825,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7013,7 +6950,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7025,8 +6962,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7090,7 +7026,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7102,8 +7038,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7205,7 +7140,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7217,8 +7152,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7275,7 +7209,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7287,8 +7221,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7363,7 +7296,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7375,8 +7308,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7432,7 +7364,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7444,8 +7376,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7547,7 +7478,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7559,8 +7490,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7616,7 +7546,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7628,8 +7558,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7754,7 +7683,7 @@ describe("Entity builder", () => { columnType: "timestamptz", type: "date", nullable: false, - onCreate: expect.any(Function), + defaultRaw: "now()", name: "created_at", fieldName: "created_at", @@ -7766,8 +7695,7 @@ describe("Entity builder", () => { columnType: "timestamptz", type: "date", nullable: false, - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + defaultRaw: "now()", name: "updated_at", fieldName: "updated_at", @@ -7825,7 +7753,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7837,8 +7765,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7894,7 +7821,7 @@ describe("Entity builder", () => { name: "created_at", fieldName: "created_at", defaultRaw: "now()", - onCreate: expect.any(Function), + nullable: false, getter: false, setter: false, @@ -7906,8 +7833,7 @@ describe("Entity builder", () => { name: "updated_at", fieldName: "updated_at", defaultRaw: "now()", - onCreate: expect.any(Function), - onUpdate: expect.any(Function), + nullable: false, getter: false, setter: false, diff --git a/packages/core/utils/src/dml/helpers/entity-builder/define-property.ts b/packages/core/utils/src/dml/helpers/entity-builder/define-property.ts index dcd68a6567..cc0d1b3dae 100644 --- a/packages/core/utils/src/dml/helpers/entity-builder/define-property.ts +++ b/packages/core/utils/src/dml/helpers/entity-builder/define-property.ts @@ -80,7 +80,6 @@ const SPECIAL_PROPERTIES: { nullable: false, fieldName: field.fieldName, defaultRaw: "now()", - onCreate: () => new Date(), })(MikroORMEntity.prototype, field.fieldName) }, updated_at: (MikroORMEntity, field) => { @@ -90,8 +89,6 @@ const SPECIAL_PROPERTIES: { nullable: false, fieldName: field.fieldName, defaultRaw: "now()", - onCreate: () => new Date(), - onUpdate: () => new Date(), })(MikroORMEntity.prototype, field.fieldName) }, deleted_at: (MikroORMEntity, field, tableName) => { diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts index e4492dffd1..64f802727c 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__fixtures__/workflow_scheduled.ts @@ -5,11 +5,19 @@ import { StepResponse, } from "@medusajs/framework/workflows-sdk" -export const createScheduled = (name: string, schedule?: SchedulerOptions) => { +export const createScheduled = ( + name: string, + next: () => void, + schedule?: SchedulerOptions +) => { const workflowScheduledStepInvoke = jest.fn((input, { container }) => { - return new StepResponse({ - testValue: container.resolve("test-value"), - }) + try { + return new StepResponse({ + testValue: container.resolve("test-value", { allowUnregistered: true }), + }) + } finally { + next() + } }) const step = createStep("step_1", workflowScheduledStepInvoke) diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 6632842364..20e7094fd4 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -6,9 +6,11 @@ import { import { Context, IWorkflowEngineService, + Logger, RemoteQueryFunction, } from "@medusajs/framework/types" import { + ContainerRegistrationKeys, Module, Modules, promiseAll, @@ -41,6 +43,7 @@ import { workflowEventGroupIdStep2Mock, } from "../__fixtures__/workflow_event_group_id" import { createScheduled } from "../__fixtures__/workflow_scheduled" +import { container, MedusaContainer } from "@medusajs/framework" jest.setTimeout(60000) @@ -54,15 +57,44 @@ const failTrap = (done) => { }, 5000) } +function times(num) { + let resolver + let counter = 0 + const promise = new Promise((resolve) => { + resolver = resolve + }) + + return { + next: () => { + counter += 1 + if (counter === num) { + resolver() + } + }, + // Force resolution after 10 seconds to prevent infinite awaiting + promise: Promise.race([ + promise, + new Promise((_, reject) => { + setTimeoutSync( + () => reject("times has not been resolved after 10 seconds."), + 10000 + ) + }), + ]), + } +} + moduleIntegrationTestRunner({ moduleName: Modules.WORKFLOW_ENGINE, resolve: __dirname + "/../..", testSuite: ({ service: workflowOrcModule, medusaApp }) => { describe("Workflow Orchestrator module", function () { let query: RemoteQueryFunction + let sharedContainer_: MedusaContainer beforeEach(() => { query = medusaApp.query + sharedContainer_ = medusaApp.sharedContainer }) it(`should export the appropriate linkable configuration`, () => { @@ -797,7 +829,6 @@ moduleIntegrationTestRunner({ describe("Scheduled workflows", () => { beforeEach(() => { jest.clearAllMocks() - jest.useFakeTimers() // Register test-value in the container for all tests const sharedContainer = @@ -809,62 +840,48 @@ moduleIntegrationTestRunner({ ) }) - afterEach(() => { - jest.useRealTimers() - }) - it("should execute a scheduled workflow", async () => { - const spy = createScheduled("standard", { - cron: "0 0 * * * *", // Runs at the start of every hour - }) - - expect(spy).toHaveBeenCalledTimes(0) - - await jest.runOnlyPendingTimersAsync() - - expect(spy).toHaveBeenCalledTimes(1) - - await jest.runOnlyPendingTimersAsync() + const wait = times(2) + const spy = createScheduled("standard", wait.next) + await wait.promise expect(spy).toHaveBeenCalledTimes(2) + WorkflowManager.unregister("standard") }) it("should stop executions after the set number of executions", async () => { - const spy = await createScheduled("num-executions", { + const wait = times(2) + const spy = createScheduled("num-executions", wait.next, { interval: 1000, numberOfExecutions: 2, }) - expect(spy).toHaveBeenCalledTimes(0) - - await jest.advanceTimersByTimeAsync(1100) - - expect(spy).toHaveBeenCalledTimes(1) - - await jest.advanceTimersByTimeAsync(1100) - + await wait.promise expect(spy).toHaveBeenCalledTimes(2) - await jest.advanceTimersByTimeAsync(1100) - + // Make sure that on the next tick it doesn't execute again + await setTimeoutPromise(1100) expect(spy).toHaveBeenCalledTimes(2) + + WorkflowManager.unregister("num-execution") }) it("should remove scheduled workflow if workflow no longer exists", async () => { - const spy = await createScheduled("remove-scheduled", { + const wait = times(1) + const logger = sharedContainer_.resolve( + ContainerRegistrationKeys.LOGGER + ) + + const spy = createScheduled("remove-scheduled", wait.next, { interval: 1000, }) - const logSpy = jest.spyOn(console, "warn") - - expect(spy).toHaveBeenCalledTimes(0) - - await jest.advanceTimersByTimeAsync(1100) + const logSpy = jest.spyOn(logger, "warn") + await wait.promise expect(spy).toHaveBeenCalledTimes(1) - WorkflowManager["workflows"].delete("remove-scheduled") - await jest.advanceTimersByTimeAsync(1100) + await setTimeoutPromise(1100) expect(spy).toHaveBeenCalledTimes(1) expect(logSpy).toHaveBeenCalledWith( "Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler." @@ -872,23 +889,20 @@ moduleIntegrationTestRunner({ }) it("the scheduled workflow should have access to the shared container", async () => { - const spy = await createScheduled("shared-container-job", { + const wait = times(1) + + const spy = await createScheduled("shared-container-job", wait.next, { interval: 1000, - numberOfExecutions: 1, }) + await wait.promise - const initialCallCount = spy.mock.calls.length + expect(spy).toHaveBeenCalledTimes(1) - await jest.advanceTimersByTimeAsync(1100) - - expect(spy).toHaveBeenCalledTimes(initialCallCount + 1) + console.log(spy.mock.results) expect(spy).toHaveReturnedWith( expect.objectContaining({ output: { testValue: "test" } }) ) - - await jest.advanceTimersByTimeAsync(1100) - - expect(spy).toHaveBeenCalledTimes(initialCallCount + 1) + WorkflowManager.unregister("shared-container-job") }) it("should fetch an idempotent workflow after its completion", async () => { @@ -931,6 +945,120 @@ moduleIntegrationTestRunner({ expect(executionsListAfter).toHaveLength(1) }) }) + + describe("Cleaner job", function () { + it("should remove expired executions of finished workflows and keep the others", async () => { + const doneWorkflowId = "done-workflow-" + ulid() + createWorkflow({ name: doneWorkflowId, retentionTime: 1 }, () => { + return new WorkflowResponse("done") + }) + + const failingWorkflowId = "failing-workflow-" + ulid() + const failingStep = createStep("failing-step", () => { + throw new Error("I am failing") + }) + createWorkflow({ name: failingWorkflowId, retentionTime: 1 }, () => { + failingStep() + }) + + const revertingStep = createStep( + "reverting-step", + () => { + throw new Error("I am reverting") + }, + () => { + return new StepResponse("reverted") + } + ) + + const revertingWorkflowId = "reverting-workflow-" + ulid() + createWorkflow( + { name: revertingWorkflowId, retentionTime: 1 }, + () => { + revertingStep() + return new WorkflowResponse("reverted") + } + ) + + const runningWorkflowId = "running-workflow-" + ulid() + const longRunningStep = createStep("long-running-step", async () => { + await setTimeoutPromise(10000) + return new StepResponse("long running finished") + }) + createWorkflow({ name: runningWorkflowId, retentionTime: 1 }, () => { + longRunningStep().config({ async: true, backgroundExecution: true }) + return new WorkflowResponse("running workflow started") + }) + + const notExpiredWorkflowId = "not-expired-workflow-" + ulid() + createWorkflow( + { name: notExpiredWorkflowId, retentionTime: 1000 }, + () => { + return new WorkflowResponse("not expired") + } + ) + + const trx_done = "trx-done-" + ulid() + const trx_failed = "trx-failed-" + ulid() + const trx_reverting = "trx-reverting-" + ulid() + const trx_running = "trx-running-" + ulid() + const trx_not_expired = "trx-not-expired-" + ulid() + + // run workflows + await workflowOrcModule.run(doneWorkflowId, { + transactionId: trx_done, + }) + + await workflowOrcModule.run(failingWorkflowId, { + transactionId: trx_failed, + throwOnError: false, + }) + + await workflowOrcModule.run(revertingWorkflowId, { + transactionId: trx_reverting, + throwOnError: false, + }) + + await workflowOrcModule.run(runningWorkflowId, { + transactionId: trx_running, + }) + + await workflowOrcModule.run(notExpiredWorkflowId, { + transactionId: trx_not_expired, + }) + + const executions = await workflowOrcModule.listWorkflowExecutions() + expect(executions).toHaveLength(5) + + await setTimeoutPromise(2000) + + // Manually trigger cleaner + await (workflowOrcModule as any).workflowOrchestratorService_[ + "inMemoryDistributedTransactionStorage_" + ]["clearExpiredExecutions"]() + + let remainingExecutions = + await workflowOrcModule.listWorkflowExecutions() + + expect(remainingExecutions).toHaveLength(2) + + const remainingTrxIds = remainingExecutions + .map((e) => e.transaction_id) + .sort() + + expect(remainingTrxIds).toEqual([trx_not_expired, trx_running].sort()) + + const notExpiredExec = remainingExecutions.find( + (e) => e.transaction_id === trx_not_expired + ) + expect(notExpiredExec?.state).toBe(TransactionState.DONE) + + const runningExec = remainingExecutions.find( + (e) => e.transaction_id === trx_running + ) + expect(runningExec?.state).toBe(TransactionState.INVOKING) + }) + }) }) }, }) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts index cd179cdb39..b8455cad5c 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflow-orchestrator.ts @@ -87,6 +87,7 @@ const AnySubscriber = "any" export class WorkflowOrchestratorService { private subscribers: Subscribers = new Map() private container_: MedusaContainer + private inMemoryDistributedTransactionStorage_: InMemoryDistributedTransactionStorage constructor({ inMemoryDistributedTransactionStorage, @@ -97,11 +98,21 @@ export class WorkflowOrchestratorService { sharedContainer: MedusaContainer }) { this.container_ = sharedContainer + this.inMemoryDistributedTransactionStorage_ = + inMemoryDistributedTransactionStorage inMemoryDistributedTransactionStorage.setWorkflowOrchestratorService(this) DistributedTransaction.setStorage(inMemoryDistributedTransactionStorage) WorkflowScheduler.setStorage(inMemoryDistributedTransactionStorage) } + async onApplicationStart() { + await this.inMemoryDistributedTransactionStorage_.onApplicationStart() + } + + async onApplicationShutdown() { + await this.inMemoryDistributedTransactionStorage_.onApplicationShutdown() + } + private async triggerParentStep(transaction, result) { const metadata = transaction.flow.metadata const { parentStepIdempotencyKey } = metadata ?? {} diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 305cd14b20..563f1d8025 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -43,7 +43,6 @@ export class WorkflowsModuleService< protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService protected workflowOrchestratorService_: WorkflowOrchestratorService protected manager_: SqlEntityManager - private clearTimeout_: NodeJS.Timeout constructor( { @@ -65,16 +64,10 @@ export class WorkflowsModuleService< __hooks = { onApplicationStart: async () => { - await this.clearExpiredExecutions() - - this.clearTimeout_ = setInterval(async () => { - try { - await this.clearExpiredExecutions() - } catch {} - }, 1000 * 60 * 60) + await this.workflowOrchestratorService_.onApplicationStart() }, onApplicationShutdown: async () => { - clearInterval(this.clearTimeout_) + await this.workflowOrchestratorService_.onApplicationShutdown() }, } @@ -289,14 +282,6 @@ export class WorkflowsModuleService< return this.workflowOrchestratorService_.unsubscribe(args as any) } - private async clearExpiredExecutions() { - return this.manager_.execute(` - DELETE FROM workflow_execution - WHERE retention_time IS NOT NULL AND - updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time); - `) - } - @InjectSharedContext() async cancel>( workflowIdOrWorkflow: TWorkflow, diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 14553aeb38..22d32c589b 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -21,9 +21,9 @@ import { MedusaError, TransactionState, TransactionStepState, - isDefined, isPresent, } from "@medusajs/framework/utils" +import { raw } from "@mikro-orm/core" import { WorkflowOrchestratorService } from "@services" import { type CronExpression, parseExpression } from "cron-parser" import { WorkflowExecution } from "../models/workflow-execution" @@ -61,7 +61,8 @@ export class InMemoryDistributedTransactionStorage private logger_: Logger private workflowOrchestratorService_: WorkflowOrchestratorService - private storage: Map = new Map() + private storage: Map> = + new Map() private scheduled: Map< string, { @@ -74,6 +75,8 @@ export class InMemoryDistributedTransactionStorage private retries: Map = new Map() private timeouts: Map = new Map() + private clearTimeout_: NodeJS.Timeout + constructor({ workflowExecutionService, logger, @@ -85,6 +88,18 @@ export class InMemoryDistributedTransactionStorage this.logger_ = logger } + async onApplicationStart() { + this.clearTimeout_ = setInterval(async () => { + try { + await this.clearExpiredExecutions() + } catch {} + }, 1000 * 60 * 60) + } + + async onApplicationShutdown() { + clearInterval(this.clearTimeout_) + } + setWorkflowOrchestratorService(workflowOrchestratorService) { this.workflowOrchestratorService_ = workflowOrchestratorService } @@ -122,17 +137,6 @@ export class InMemoryDistributedTransactionStorage isCancelling?: boolean } ): Promise { - const data = this.storage.get(key) - - if (data) { - return data - } - - const { idempotent, store, retentionTime } = options ?? {} - if (!idempotent && !(store && isDefined(retentionTime))) { - return - } - const [_, workflowId, transactionId] = key.split(":") const trx: InferEntityType | undefined = await this.workflowExecutionService_ @@ -153,6 +157,7 @@ export class InMemoryDistributedTransactionStorage .catch(() => undefined) if (trx) { + const { idempotent } = options ?? {} const execution = trx.execution as TransactionFlow if (!idempotent) { @@ -187,10 +192,6 @@ export class InMemoryDistributedTransactionStorage return } - async list(): Promise { - return Array.from(this.storage.values()) - } - async save( key: string, data: TransactionCheckpoint, @@ -237,7 +238,11 @@ export class InMemoryDistributedTransactionStorage } } - this.storage.set(key, data) + const { flow, errors } = data + this.storage.set(key, { + flow, + errors, + }) // Optimize DB operations - only perform when necessary if (hasFinished) { @@ -272,14 +277,22 @@ export class InMemoryDistributedTransactionStorage */ const currentFlow = data.flow - const getOptions = { - ...options, - isCancelling: !!data.flow.cancelledAt, - } as Parameters[1] + const rawData = this.storage.get(key) + let data_ = {} as TransactionCheckpoint + if (rawData) { + data_ = rawData as TransactionCheckpoint + } else { + const getOptions = { + ...options, + isCancelling: !!data.flow.cancelledAt, + } as Parameters[1] - const { flow: latestUpdatedFlow } = - (await this.get(key, getOptions)) ?? - ({ flow: {} } as { flow: TransactionFlow }) + data_ = + (await this.get(key, getOptions)) ?? + ({ flow: {} } as TransactionCheckpoint) + } + + const { flow: latestUpdatedFlow } = data_ if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) { /** @@ -613,4 +626,25 @@ export class InMemoryDistributedTransactionStorage throw e } } + + async clearExpiredExecutions(): Promise { + await this.workflowExecutionService_.delete({ + retention_time: { + $ne: null, + }, + updated_at: { + $lte: raw( + (alias) => + `CURRENT_TIMESTAMP - (INTERVAL '1 second' * retention_time)` + ), + }, + state: { + $in: [ + TransactionState.DONE, + TransactionState.FAILED, + TransactionState.REVERTED, + ], + }, + }) + } } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index b656404b2b..47280fc4af 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -1072,7 +1072,6 @@ moduleIntegrationTestRunner({ WorkflowManager["workflows"].delete("remove-scheduled") await setTimeout(1100) - 0 expect(spy).toHaveBeenCalledTimes(1) expect(logSpy).toHaveBeenCalledWith( "Tried to execute a scheduled workflow with ID remove-scheduled that does not exist, removing it from the scheduler." @@ -1107,6 +1106,120 @@ moduleIntegrationTestRunner({ }) }) }) + + describe("Cleaner job", function () { + it("should remove expired executions of finished workflows and keep the others", async () => { + const doneWorkflowId = "done-workflow-" + ulid() + createWorkflow({ name: doneWorkflowId, retentionTime: 1 }, () => { + return new WorkflowResponse("done") + }) + + const failingWorkflowId = "failing-workflow-" + ulid() + const failingStep = createStep("failing-step", () => { + throw new Error("I am failing") + }) + createWorkflow({ name: failingWorkflowId, retentionTime: 1 }, () => { + failingStep() + }) + + const revertingStep = createStep( + "reverting-step", + () => { + throw new Error("I am reverting") + }, + () => { + return new StepResponse("reverted") + } + ) + + const revertingWorkflowId = "reverting-workflow-" + ulid() + createWorkflow( + { name: revertingWorkflowId, retentionTime: 1 }, + () => { + revertingStep() + return new WorkflowResponse("reverted") + } + ) + + const runningWorkflowId = "running-workflow-" + ulid() + const longRunningStep = createStep("long-running-step", async () => { + await setTimeout(10000) + return new StepResponse("long running finished") + }) + createWorkflow({ name: runningWorkflowId, retentionTime: 1 }, () => { + longRunningStep().config({ async: true, backgroundExecution: true }) + return new WorkflowResponse("running workflow started") + }) + + const notExpiredWorkflowId = "not-expired-workflow-" + ulid() + createWorkflow( + { name: notExpiredWorkflowId, retentionTime: 1000 }, + () => { + return new WorkflowResponse("not expired") + } + ) + + const trx_done = "trx-done-" + ulid() + const trx_failed = "trx-failed-" + ulid() + const trx_reverting = "trx-reverting-" + ulid() + const trx_running = "trx-running-" + ulid() + const trx_not_expired = "trx-not-expired-" + ulid() + + // run workflows + await workflowOrcModule.run(doneWorkflowId, { + transactionId: trx_done, + }) + + await workflowOrcModule.run(failingWorkflowId, { + transactionId: trx_failed, + throwOnError: false, + }) + + await workflowOrcModule.run(revertingWorkflowId, { + transactionId: trx_reverting, + throwOnError: false, + }) + + await workflowOrcModule.run(runningWorkflowId, { + transactionId: trx_running, + }) + + await workflowOrcModule.run(notExpiredWorkflowId, { + transactionId: trx_not_expired, + }) + + let executions = await workflowOrcModule.listWorkflowExecutions() + expect(executions).toHaveLength(5) + + await setTimeout(2000) + + // Manually trigger cleaner + await (workflowOrcModule as any).workflowOrchestratorService_[ + "redisDistributedTransactionStorage_" + ]["clearExpiredExecutions"]() + + let remainingExecutions = + await workflowOrcModule.listWorkflowExecutions() + + expect(remainingExecutions).toHaveLength(2) + + const remainingTrxIds = remainingExecutions + .map((e) => e.transaction_id) + .sort() + + expect(remainingTrxIds).toEqual([trx_not_expired, trx_running].sort()) + + const notExpiredExec = remainingExecutions.find( + (e) => e.transaction_id === trx_not_expired + ) + expect(notExpiredExec?.state).toBe(TransactionState.DONE) + + const runningExec = remainingExecutions.find( + (e) => e.transaction_id === trx_running + ) + expect(runningExec?.state).toBe(TransactionState.INVOKING) + }) + }) }) }, }) diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts index ad7028f429..2860ca1b1f 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -96,7 +96,9 @@ moduleIntegrationTestRunner({ if (event.eventType === "onFinish") { try { expect(step0InvokeMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) + expect( + step1InvokeMock.mock.calls.length + ).toBeGreaterThanOrEqual(1) expect(step2InvokeMock).toHaveBeenCalledTimes(1) expect(transformMock).toHaveBeenCalledTimes(1) diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index e19b1bc7ff..445c614b4c 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -47,7 +47,6 @@ export class WorkflowsModuleService< protected workflowOrchestratorService_: WorkflowOrchestratorService protected redisDisconnectHandler_: () => Promise protected manager_: SqlEntityManager - private clearTimeout_: NodeJS.Timeout constructor( { @@ -72,13 +71,6 @@ export class WorkflowsModuleService< __hooks = { onApplicationStart: async () => { await this.workflowOrchestratorService_.onApplicationStart() - - await this.clearExpiredExecutions() - this.clearTimeout_ = setInterval(async () => { - try { - await this.clearExpiredExecutions() - } catch {} - }, 1000 * 60 * 60) }, onApplicationPrepareShutdown: async () => { await this.workflowOrchestratorService_.onApplicationPrepareShutdown() @@ -86,7 +78,6 @@ export class WorkflowsModuleService< onApplicationShutdown: async () => { await this.workflowOrchestratorService_.onApplicationShutdown() await this.redisDisconnectHandler_() - clearInterval(this.clearTimeout_) }, } @@ -301,14 +292,6 @@ export class WorkflowsModuleService< return this.workflowOrchestratorService_.unsubscribe(args as any) } - private async clearExpiredExecutions() { - return this.manager_.execute(` - DELETE FROM workflow_execution - WHERE retention_time IS NOT NULL AND - updated_at <= (CURRENT_TIMESTAMP - INTERVAL '1 second' * retention_time); - `) - } - @InjectSharedContext() async cancel( workflowId: string, diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 978a8835b7..99e2210646 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -1,5 +1,4 @@ import { - DistributedTransaction, DistributedTransactionType, IDistributedSchedulerStorage, IDistributedTransactionStorage, @@ -15,13 +14,13 @@ import { } from "@medusajs/framework/orchestration" import { Logger, ModulesSdkTypes } from "@medusajs/framework/types" import { - isDefined, isPresent, MedusaError, promiseAll, TransactionState, TransactionStepState, } from "@medusajs/framework/utils" +import { raw } from "@mikro-orm/core" import { WorkflowOrchestratorService } from "@services" import { Queue, RepeatOptions, Worker } from "bullmq" import Redis from "ioredis" @@ -33,6 +32,9 @@ enum JobType { TRANSACTION_TIMEOUT = "transaction_timeout", } +const ONE_HOUR_IN_MS = 1000 * 60 * 60 +const REPEATABLE_CLEARER_JOB_ID = "clear-expired-executions" + export class RedisDistributedTransactionStorage implements IDistributedTransactionStorage, IDistributedSchedulerStorage { @@ -48,6 +50,9 @@ export class RedisDistributedTransactionStorage private jobQueue?: Queue private worker: Worker private jobWorker?: Worker + private cleanerQueueName: string + private cleanerWorker_: Worker + private cleanerQueue_?: Queue #isWorkerMode: boolean = false @@ -72,6 +77,7 @@ export class RedisDistributedTransactionStorage this.logger_ = logger this.redisClient = redisConnection this.redisWorkerConnection = redisWorkerConnection + this.cleanerQueueName = "workflows-cleaner" this.queueName = redisQueueName this.jobQueueName = redisJobQueueName this.queue = new Queue(redisQueueName, { connection: this.redisClient }) @@ -80,6 +86,11 @@ export class RedisDistributedTransactionStorage connection: this.redisClient, }) : undefined + this.cleanerQueue_ = isWorkerMode + ? new Queue(this.cleanerQueueName, { + connection: this.redisClient, + }) + : undefined this.#isWorkerMode = isWorkerMode } @@ -87,11 +98,21 @@ export class RedisDistributedTransactionStorage // Close worker gracefully, i.e. wait for the current jobs to finish await this.worker?.close() await this.jobWorker?.close() + + const repeatableJobs = (await this.cleanerQueue_?.getRepeatableJobs()) ?? [] + for (const job of repeatableJobs) { + if (job.id === REPEATABLE_CLEARER_JOB_ID) { + await this.cleanerQueue_?.removeRepeatableByKey(job.key) + } + } + + await this.cleanerWorker_?.close() } async onApplicationShutdown() { await this.queue?.close() await this.jobQueue?.close() + await this.cleanerQueue_?.close() } async onApplicationStart() { @@ -151,6 +172,27 @@ export class RedisDistributedTransactionStorage }, workerOptions ) + + this.cleanerWorker_ = new Worker( + this.cleanerQueueName, + async () => { + await this.clearExpiredExecutions() + }, + { connection: this.redisClient } + ) + + await this.cleanerQueue_?.add( + "cleaner", + {}, + { + repeat: { + every: ONE_HOUR_IN_MS, + }, + jobId: REPEATABLE_CLEARER_JOB_ID, + removeOnComplete: true, + removeOnFail: true, + } + ) } } @@ -230,19 +272,6 @@ export class RedisDistributedTransactionStorage key: string, options?: TransactionOptions & { isCancelling?: boolean } ): Promise { - const data = await this.redisClient.get(key) - - if (data) { - const parsedData = JSON.parse(data) as TransactionCheckpoint - return parsedData - } - - // Not in Redis either - check database if needed - const { idempotent, store, retentionTime } = options ?? {} - if (!idempotent && !(store && isDefined(retentionTime))) { - return - } - const [_, workflowId, transactionId] = key.split(":") const trx = await this.workflowExecutionService_ .list( @@ -262,6 +291,7 @@ export class RedisDistributedTransactionStorage .catch(() => undefined) if (trx) { + const { idempotent } = options ?? {} const execution = trx.execution as TransactionFlow if (!idempotent) { @@ -296,38 +326,6 @@ export class RedisDistributedTransactionStorage return } - async list(): Promise { - // Replace Redis KEYS with SCAN to avoid blocking the server - const transactions: TransactionCheckpoint[] = [] - let cursor = "0" - - do { - // Use SCAN instead of KEYS to avoid blocking Redis - const [nextCursor, keys] = await this.redisClient.scan( - cursor, - "MATCH", - DistributedTransaction.keyPrefix + ":*", - "COUNT", - 100 // Fetch in reasonable batches - ) - - cursor = nextCursor - - if (keys.length) { - // Use mget to batch retrieve multiple keys at once - const values = await this.redisClient.mget(keys) - - for (const value of values) { - if (value) { - transactions.push(JSON.parse(value)) - } - } - } - } while (cursor !== "0") - - return transactions - } - async save( key: string, data: TransactionCheckpoint, @@ -364,7 +362,11 @@ export class RedisDistributedTransactionStorage const shouldSetNX = isNotStarted && isManualTransactionId // Prepare operations to be executed in batch or pipeline - const stringifiedData = JSON.stringify(data) + const data_ = { + errors: data.errors, + flow: data.flow, + } + const stringifiedData = JSON.stringify(data_) const pipeline = this.redisClient.pipeline() // Execute Redis operations @@ -614,14 +616,22 @@ export class RedisDistributedTransactionStorage */ const currentFlow = data.flow - const getOptions = { - ...options, - isCancelling: !!data.flow.cancelledAt, - } as Parameters[1] + const rawData = await this.redisClient.get(key) + let data_ = {} as TransactionCheckpoint + if (rawData) { + data_ = JSON.parse(rawData) + } else { + const getOptions = { + ...options, + isCancelling: !!data.flow.cancelledAt, + } as Parameters[1] - const { flow: latestUpdatedFlow } = - (await this.get(key, getOptions)) ?? - ({ flow: {} } as { flow: TransactionFlow }) + data_ = + (await this.get(key, getOptions)) ?? + ({ flow: {} } as TransactionCheckpoint) + } + + const { flow: latestUpdatedFlow } = data_ if (!isInitialCheckpoint && !isPresent(latestUpdatedFlow)) { /** @@ -728,4 +738,25 @@ export class RedisDistributedTransactionStorage throw new SkipExecutionError("Already finished by another execution") } } + + async clearExpiredExecutions() { + await this.workflowExecutionService_.delete({ + retention_time: { + $ne: null, + }, + updated_at: { + $lte: raw( + (alias) => + `CURRENT_TIMESTAMP - (INTERVAL '1 second' * "retention_time")` + ), + }, + state: { + $in: [ + TransactionState.DONE, + TransactionState.FAILED, + TransactionState.REVERTED, + ], + }, + }) + } }