Commit Graph

69 Commits

Author SHA1 Message Date
Adrien de Peretti
144f0f4e2e chore(event-bus, workflow-engine): Enable more granualar queues configuration (#14201)
Summary

  This PR adds BullMQ queue and worker configuration options to the workflow-engine-redis module, bringing feature parity with the event-bus-redis module. It also introduces per-queue
  configuration options for fine-grained control over the three internal queues (main, job, and cleaner).

  Key changes:
  - Added per-queue BullMQ configuration options (mainQueueOptions, jobQueueOptions, cleanerQueueOptions and their worker counterparts) with shared defaults
  - Unified Redis option naming across modules: deprecated url → redisUrl, options → redisOptions (with backward compatibility)
  - Moved configuration resolution to the loader and registered options in the DI container
  - Added comprehensive JSDoc documentation for all configuration options
  - Added unit tests for option merging and queue/worker configuration

  Configuration Example

```ts
  // Simple configuration - same options for all queues
  {
    redisUrl: "redis://localhost:6379",
    queueOptions: { defaultJobOptions: { removeOnComplete: 1000 } },
    workerOptions: { concurrency: 10 }
  }
```

```ts
  // Advanced configuration - per-queue overrides
  {
    redisUrl: "redis://localhost:6379",
    workerOptions: { concurrency: 10 },        // shared default
    jobWorkerOptions: { concurrency: 5 },      // override for scheduled workflows
    cleanerWorkerOptions: { concurrency: 1 }   // override for cleanup (low priority)
  }
```
2025-12-05 12:03:12 +00:00
Adrien de Peretti
6c3ec528f1 fix(): Workflow engine redis worker instance in worker mode (#14099)
* fix(): Workflow engine redis worker instance in worker mode

* Create wicked-tips-buy.md
2025-11-24 09:31:36 +01:00
Adrien de Peretti
1ea932a56f fix(): Identify step that should require a save on checkpoint (#14037)
* fix(): Identify step that force save checkpoint

* Create sour-peas-provide.md
2025-11-17 12:47:57 +01:00
Adrien de Peretti
d51ae2768b chore(workflow-engine-*): cleanup and improvements (#13789)
**What**
Cleanup recent work on workflows
2025-10-23 10:50:24 +00:00
Sebastian Rindom
bad0858348 fix: prevent jobId collisions on workflow step retries (#13786)
## Summary

**What** — What changes are introduced in this PR?

This PR fixes a bug where async workflow steps with retry intervals would get stuck after the first retry attempt due to Bull queue jobId collisions preventing retry jobs from executing.

**Why** — Why are these changes relevant or necessary?  

Workflows using async steps with retry configurations (e.g., `retryInterval: 1`, `maxRetries: 5`) would fail once, schedule a retry, but the retry job would never execute, causing workflows to hang indefinitely.

**How** — How have these changes been implemented?

**Root Cause:** Bull queue was rejecting retry jobs because they had identical jobIds to the async execution jobs that already completed. Both used the format: `retry:workflow:transaction:step_id:attempts`.

**Solution:** Modified `getJobId()` in `workflow-orchestrator-storage.ts` to append a `:retry` suffix when `interval > 0`, creating unique jobIds:
- Async execution (interval=0): `retry:...:step_id:1`
- Retry scheduling (interval>0): `retry:...:step_id:1:retry`

Updated methods: `getJobId()`, `scheduleRetry()`, `removeJob()`, and `clearRetry()` to pass and handle the interval parameter.

**Testing** — How have these changes been tested, or how can the reviewer test the feature?

Added integration test `retry-interval.spec.ts` that verifies:
1. Step with `retryInterval: 1` and `maxRetries: 3` executes 3 times
2. Retry intervals are approximately 1 second between attempts
3. Workflow completes successfully after retries
4. Uses proper async workflow completion pattern with `subscribe()` and `onFinish` event

---

## Examples

```ts
// Example workflow step that would previously get stuck
export const testRetryStep = createStep(
  {
    name: "test-retry-step",
    async: true,
    retryInterval: 1, // 1 second retry interval
    maxRetries: 3,
  },
  async (input: any) => {
    // Simulate failure on first 2 attempts
    if (attempts < 3) {
      throw new Error("Temporary failure - will retry")
    }
    return { success: true }
  }
)

// Before fix: Step would fail once, schedule retry, but retry job never fired (jobId collision)
// After fix: Step properly retries up to 3 times with 1-second intervals
```

---

## Checklist

Please ensure the following before requesting a review:

- [ ] I have added a **changeset** for this PR
    - Every non-breaking change should be marked as a **patch**
    - To add a changeset, run `yarn changeset` and follow the prompts
- [ ] The changes are covered by relevant **tests**
- [ ] I have verified the code works as intended locally
- [ ] I have linked the related issue(s) if applicable

---

## Additional Context
-

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
2025-10-21 18:27:21 +00:00
Adrien de Peretti
516f5a3896 fix: workflow async concurrency (#13769)
* executeAsync

* || 1

* wip

* stepId

* stepId

* wip

* wip

* continue versioning management changes

* fix and improve concurrency

* update in memory engine

* remove duplicated test

* fix script

* Create weak-drinks-confess.md

* fixes

* fix

* fix

* continuation

* centralize merge checkepoint

* centralize merge checkpoint

* fix locking

* rm only

* Continue improvements and fixes

* fixes

* fixes

* hasAwaiting will be recomputed

* fix orchestrator engine

* bump version on async parallel steps only

* mark as delivered fix

* changeset

* check partitions

* avoid saving when having parent step

* cart test

---------

Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
2025-10-20 15:29:19 +02:00
Adrien de Peretti
b43b285125 fix(): Workflow save to db + index integration instability (#13707)
**What**
- Fix index integration tests instability
- Fix workflow engine storage save to db
2025-10-08 08:45:15 +00:00
Adrien de Peretti
51859c38a7 chore(): Default caching configuration and gracefull redis error handling (#13663)
* chore(): Default caching configuration and gracefull redis error handling

* Create odd-moons-crash.md

* chore(): Default caching configuration and gracefull redis error handling

* fixes

* address feedback

* revert(): Test utils imit module fix

* reconnect

* reconnect

* reconnect
2025-10-06 17:57:11 +02:00
Adrien de Peretti
76aa4a48b3 fix(): workflows concurrency (#13645) 2025-10-02 11:11:38 -03:00
Adrien de Peretti
12a96a7c70 chore(): Move peer deps into a single package and re export from framework (#13439)
* chore(): Move peer deps into a single package and re export from framework

* WIP

* update core packages

* update cli and deps

* update medusa

* update exports path

* remove analyze

* update modules deps

* finalise changes

* fix yarn

* fix import

* Refactor peer dependencies into a single package

Consolidate peer dependencies into one package and re-export from the framework.

* update changeset

* Update .changeset/brown-cows-sleep.md

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>

* rm deps

* fix deps

* increase timeout

* upgrade version

* update versions

* update versions

* fixes

* update lock

* fix missing import

* fix missing import

---------

Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
2025-09-22 18:36:22 +02:00
Adrien de Peretti
9633e0676e fix(workflow-engine-redis): module option queueName wrongly used (#13570)
https://github.com/medusajs/medusa/issues/13452

**What**
Wrongly used module option for redis workflow engine
2025-09-22 10:33:51 +00:00
Carlos R. L. Rodrigues
68a643bb3a fix(workflow-engine-redis): use worker connection (#13561)
What:
 * Regular redis connection does not have `maxRetriesPerRequest: null`, and the worker connection hangs.
2025-09-22 04:59:59 +00:00
Adrien de Peretti
cb716856b6 fix(engine): Always create cleaner job (#13557)
RESOLVES CORE-1206

**What**
Instead of removing cleaner repeatable job and risk to remove it while other instances are still up, we always create it, since the id of a job is unique and we give one to the job, if already present it wont get added
2025-09-19 14:38:14 +00:00
Adrien de Peretti
fc4d5f0ac9 chore(): Workflow engine timers and notification improvements (#13434)
RESOLVES CORE-1177

**What**
main changes are:
- not blocking execution when notifying
- timers management
- race condition checks improvements
2025-09-08 18:19:55 +00:00
Adrien de Peretti
d7692100e7 chore(orchestration): add support for autoRetry, maxAwaitingRetries, retryStep (#13391)
RESOLVES CORE-1163
RESOLVES CORE-1164

**What**

### Add support for non auto retryable steps.

When marking a step with `maxRetries`, when it will fail it will be marked as temporary failure and then retry itself automatically. Thats the default behaviour, if you now add `autoRetry: false`, when the step will fail it will be marked as temporary failure but not retry automatically. you can now call the workflow engine run to resume the workflow from the failing step to be retried.

### Add support for `maxAwaitingRetries`

When setting `retyIntervalAwaiting` a step that is awaiting will be retried after the specified interval without maximun retry. Now you can set `maxAwaitingRetries` to force a maximum awaiting retry number

### Add support to manually retry an awaiting step

In some scenario, either a machine dies while a step is executing or a step is taking longer than expected, you can now call `retryStep` on the workflow engine to force a retry of the step that is supposedly stucked
2025-09-08 12:46:30 +00:00
Adrien de Peretti
2b89510df3 chore(): only execute race execution checks and publish message only for async workflows (#13396)
* chore(): only execute race execution checks for async workflows

* chore(): workflow redis publish only for async flows

* Create cyan-gorillas-poke.md

* chore(): workflow redis publish only for async flows

* fix negative check

---------

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
2025-09-04 09:12:09 +02:00
Adrien de Peretti
bd206cb250 chore(): improve workflow engine storage (#13345)
* chore(workflow-engines): Improve race condition management

* cleanup

* cleanup

* chore(workflow-engines): Improve race condition management

* chore(workflow-engines): Improve race condition management

* chore(workflow-engines): heartbeat extend TTL

* Refactor chore title for workflow engine improvements

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* chore(): Improve workflow execution db interaction

* update tests

* revert idempotent

* add run_id index + await deletion

* improve saving

* comment

* remove only

---------

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
2025-09-02 11:18:12 +02:00
Carlos R. L. Rodrigues
9412669e65 chore: idempotent cart operations (#13236)
* chore(core-flows): idempotent cart operations

* changeset

* add tests

* revert

* revert route

* promo test

* skip bugs

* fix test

* tests

* avoid workflow name conflict

* prevent nested workflow from being deleted until the top level parent finishes

* remove unused setTimeout

* update changeset

* rm comments

---------

Co-authored-by: adrien2p <adrien.deperetti@gmail.com>
2025-08-28 15:04:00 +02:00
Adrien de Peretti
ff152e7ace fix(orchestration): Use the step definition max retries on set step failure (#13319)
* fix(orchestration): Use the step definition max retries on set step failure

* Create sweet-turkeys-wait.md

* allow to force permanent failure

* update changeset
2025-08-28 14:35:31 +02:00
Adrien de Peretti
238e7d53c1 fix(wfe): should notify when finished + add state info (#12982) 2025-07-18 09:20:46 +02:00
Adrien de Peretti
eb83954f23 chore(workflow-engine-*): Align event subscribers management (#12976)
* chore(workflow-engine-*): Align event subscribers management

* Create nervous-eels-build.md
2025-07-16 16:34:47 +02:00
Adrien de Peretti
95d282e8ef fix: test utils events + workflow storage (#12834)
* feat(test-utils): Make event subscriber waiter robust and concurrent

* feat(test-utils): Make event subscriber waiter robust and concurrent

* fix workflows storage

* remove timeout

* Create gentle-teachers-doubt.md

* revert timestamp

* update changeset

* fix execution loop

* exit if no steps to await

* typo

* check next

* check next

* changeset

* skip when async steps

* wait workflow executions utils

* wait workflow executions utils

* wait workflow executions utils

* increase timeout

* break loop

---------

Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
2025-06-30 13:34:08 +02:00
Adrien de Peretti
316a325b63 fix(workflow-engine-*): Cleanup expired executions and reduce redis storage usage (#12795) 2025-06-24 13:32:10 +02:00
Oli Juhl
1c987540aa fix(workflow-engine-redis): Ensure PK is set without errors (#12775)
* fix(workflow-engine-redis): Align naming of migration

* change approach
2025-06-19 20:19:01 +02:00
Adrien de Peretti
1a78476608 fix(workflow-sdk): Async/nested runAsStep propagation (#12675)
FIXES CLO-524

**What**
Add hidden stepDefinition object as part of the step argument and ensure the runAsStep handlers rely on the latest definition when config is being used on the returned step in order to ensure async configuration propagation and nested configuration
2025-06-10 07:23:12 +00:00
Adrien de Peretti
7fdbf2a965 fix(workflows-sdk): Miss match context usage within run as step (#12449)
**What**

Currently, runAsStep keep reference of the workflow context that is being run as step, except that the step is composed for the current workflow composition and not the workflow being run as a step. Therefore, the context are currently miss matched leading to wrong configuration being used in case of async workflows.

**BUG**
This fix allow the runAsStep to use the current composition context to configure the step for the sub workflow to be run

**BUG BREAKING**
fix the step config wrongly used to wrap async step handlers. Now steps configured async through .config that returns a new step response will indeed marked itself as success without the need for background execution or calling setStepSuccess (as it was expected originally)

**FEATURE**
This pr also add support for cancelling running transaction, the transaction will be marked as being cancelled, once the current step finished, it will cancel the transaction to start compensating all previous steps including itself

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
2025-05-14 13:28:16 +00:00
Adrien de Peretti
ab22faaa52 Chore/test runner improvements (#12439)
**What**
Make sure there is no open handles left and that the shutdown function are properly called. Refactor and improve the medusa test runner. Make sure all modules instances are released and cleaned up

**NOTE:**
On a separate PR we can continue the investigation for the memory growing over time while the tests execute
2025-05-14 13:17:41 +00:00
Adrien de Peretti
9d29078b0d fix(workflow-engine-*): q text search (#12435) 2025-05-11 16:21:41 +02:00
Adrien de Peretti
80007f3afd feat(workflows-*): Allow to re run non idempotent but stored workflow with the same transaction id if considered done (#12362) 2025-05-06 17:17:49 +02:00
Adrien de Peretti
28958b2e26 Chore/orchestration storage improvements (#12178)
**What**
Cleanup and improve workflow storage utility
2025-04-18 08:35:23 +00:00
Adrien de Peretti
8618e6ee38 fix(): Properly handle workflow as step now that events are fixed entirely (#12196)
**What**
Now that all events management are fixed in the workflows life cycle, the run as step needs to leverage the workflow engine if present (which should always be the case for async workflows) in order to ensure the continuation and the ability to mark parent step in parent workflow as success or failure

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
2025-04-17 09:34:19 +00:00
Carlos R. L. Rodrigues
31abba8cde fix(orchestrator): save checkpoint before async step (#12138) 2025-04-10 15:36:36 +00:00
Adrien de Peretti
13e159d8ad fix(workflow-engine-*): Prevent passing shared context reference (#11873)
* fix(workflow-engine-*): Prevent passing shared context reference

* fix(workflow-engine-*): Prevent passing shared context reference

* prevent tests from hanging

* fix event handling

* add integration tests

* use interval for scheduled in tests

* skip tests for now

* Create silent-glasses-enjoy.md

* fix cancel

* changeset

* push multiple aliases

* test multiple field alias

* increase wait time to index on test

---------

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
2025-04-09 10:39:29 +02:00
Carlos R. L. Rodrigues
0625f76cd4 chore(workflow-engine): export cancel method (#11844)
What:
  * Workflow engine exports the method `cancel` to revert a workflow.
2025-03-17 12:59:09 +00:00
Adrien de Peretti
fc652ea51e fix(workflow-engine-*): scheduled jobs interval (#11800)
**What**
Currently only cron pattern are supported by scheduled jobs, this can lead to issue. for example you set the pattern to execute every hours at minute 0 and second 0 (as it is expected to execute at exactly this constraint) but due to the moment it gets executed we our out of the second 0 then the job wont get executed until the next scheduled cron table execution.

With this pr we introduce the `interval` configuration which allows you the specify a delay between execution in ms (e.g every minute -> 60 * 1000 ms) which ensure that once a job is executed another one is scheduled for a minute later.

**Usage**
```ts
// jobs/job-1.ts
const thirtySeconds = 30 * 1000

export const config = {
  name: "job-1",
  schedule: {
    interval: thirtySeconds
  },
}
```
2025-03-13 15:05:13 +00:00
Carlos R. L. Rodrigues
6053ec3976 chore(workflow-engine-redis): remove repeatable jobs from old queue (#11822)
What:
  * Old  deployments have repeatable jobs registered in a wrong queue. When the `server` instance picks that job, the workflow doesn't exist, it calls to remove the job, which then removes the job from the new queue.
  * This PR cleans up any repeatable job from the queue that is exclusive to handle workflows.
2025-03-12 14:54:10 +00:00
Adrien de Peretti
72d2cf9207 fix(workflow-engines): race condition when retry interval is used (#11771) 2025-03-12 09:53:34 -03:00
Adrien de Peretti
cc8422d3a1 fix(workflow-engine-redis): Split the queues and respect worker mode for job executions (#11740)
**What**
Currently, the workflow engine redis does not make any distinction between worker modes, when starting as server, the engine listen to the queue which contains everything and try to execute the corresponding workflow which does not exists since job workflows are not loaded in server mode. Now, a dedicated queue is created for jobs and the worker is only started if the instance is not in server mode. In order to clean up the old queue, if the old queue trigger a scheduled job then it gets removed from the queue since it will get re added to the new queue by the new worker instances
2025-03-06 11:52:52 +00:00
Carlos R. L. Rodrigues
c8376a9f15 chore(medusa): clear workflow execution (#11200)
CLOSES: SUP-704
2025-02-03 10:47:32 +00:00
Adrien de Peretti
da3906efa4 fix: Unique constraint should account for soft deleted records (#11048)
FIXES FRMW-2878

**What**
Currently, the `one-to-one` unique constraints does not account for deleted record. This prevents from inserting a new record wth the same fk if another one is deleted.

**Caveat**
`hasOne` with FK option is meant to be a special case, for example a many to one - one to many without defining the other side of the relation. In that case we don't handle this behaviour and keep it as it is
2025-01-22 07:42:06 +00:00
Adrien de Peretti
0a077d48e1 chore(workflow-engine): Migrate to DML (#10477)
RESOLVES FRMW-2832
RESOLVES FRMW-2833

**What**
Migrate workflow engines to DML. Alos includes and update to the linkable generation which now takes into account id and primary keys to generate the linkable instead of only primary keys
2024-12-06 13:23:07 +00:00
Carlos R. L. Rodrigues
90ae187e09 fix(workflows-sdk): name for when/then step (#10459) 2024-12-05 15:47:42 -03:00
Carlos R. L. Rodrigues
1eef324af3 fix(orchestration): fix set step failure (#10031)
What:
 - copy data before saving checkpoint
 - removed unused data format function
 - properly handle registerStepFailure to not throw
 - emit onFinish event even when execution failed
2024-11-12 10:06:36 +00:00
Adrien de Peretti
34d57870ad chore: workflow internals improvementss (#9455) 2024-10-10 09:11:56 +02:00
Adrien de Peretti
e096feb7d5 chore: Update modules deps (#9286) 2024-09-26 11:14:35 +05:30
Adrien de Peretti
6937d74252 chore(): Move dependencies around (#9278)
RESOLVES FRMW-2716 [1]

**What**
First stab at re organising dependencies in the modules and their usage.
2024-09-24 14:58:04 +00:00
Harminder Virk
9e711720dd chore: upgrade moduleResolution to Node16 (#9269) 2024-09-24 17:19:20 +05:30
Adrien de Peretti
58167b5dfa feat(index): Index module foundation (#9095)
**What**
Index module foundation

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
2024-09-18 19:04:04 +00:00
Carlos R. L. Rodrigues
adbeb9cc1d fix(workflow-engine-*): pass container to flow (#9180) 2024-09-18 08:44:11 -03:00
Carlos R. L. Rodrigues
ef8dc4087e feat: run nested async workflows (#9119) 2024-09-16 13:06:45 +00:00