Summary
This PR adds BullMQ queue and worker configuration options to the workflow-engine-redis module, bringing feature parity with the event-bus-redis module. It also introduces per-queue
configuration options for fine-grained control over the three internal queues (main, job, and cleaner).
Key changes:
- Added per-queue BullMQ configuration options (mainQueueOptions, jobQueueOptions, cleanerQueueOptions and their worker counterparts) with shared defaults
- Unified Redis option naming across modules: deprecated url → redisUrl, options → redisOptions (with backward compatibility)
- Moved configuration resolution to the loader and registered options in the DI container
- Added comprehensive JSDoc documentation for all configuration options
- Added unit tests for option merging and queue/worker configuration
Configuration Example
```ts
// Simple configuration - same options for all queues
{
redisUrl: "redis://localhost:6379",
queueOptions: { defaultJobOptions: { removeOnComplete: 1000 } },
workerOptions: { concurrency: 10 }
}
```
```ts
// Advanced configuration - per-queue overrides
{
redisUrl: "redis://localhost:6379",
workerOptions: { concurrency: 10 }, // shared default
jobWorkerOptions: { concurrency: 5 }, // override for scheduled workflows
cleanerWorkerOptions: { concurrency: 1 } // override for cleanup (low priority)
}
```
## Summary
**What** — What changes are introduced in this PR?
This PR fixes a bug where async workflow steps with retry intervals would get stuck after the first retry attempt due to Bull queue jobId collisions preventing retry jobs from executing.
**Why** — Why are these changes relevant or necessary?
Workflows using async steps with retry configurations (e.g., `retryInterval: 1`, `maxRetries: 5`) would fail once, schedule a retry, but the retry job would never execute, causing workflows to hang indefinitely.
**How** — How have these changes been implemented?
**Root Cause:** Bull queue was rejecting retry jobs because they had identical jobIds to the async execution jobs that already completed. Both used the format: `retry:workflow:transaction:step_id:attempts`.
**Solution:** Modified `getJobId()` in `workflow-orchestrator-storage.ts` to append a `:retry` suffix when `interval > 0`, creating unique jobIds:
- Async execution (interval=0): `retry:...:step_id:1`
- Retry scheduling (interval>0): `retry:...:step_id:1:retry`
Updated methods: `getJobId()`, `scheduleRetry()`, `removeJob()`, and `clearRetry()` to pass and handle the interval parameter.
**Testing** — How have these changes been tested, or how can the reviewer test the feature?
Added integration test `retry-interval.spec.ts` that verifies:
1. Step with `retryInterval: 1` and `maxRetries: 3` executes 3 times
2. Retry intervals are approximately 1 second between attempts
3. Workflow completes successfully after retries
4. Uses proper async workflow completion pattern with `subscribe()` and `onFinish` event
---
## Examples
```ts
// Example workflow step that would previously get stuck
export const testRetryStep = createStep(
{
name: "test-retry-step",
async: true,
retryInterval: 1, // 1 second retry interval
maxRetries: 3,
},
async (input: any) => {
// Simulate failure on first 2 attempts
if (attempts < 3) {
throw new Error("Temporary failure - will retry")
}
return { success: true }
}
)
// Before fix: Step would fail once, schedule retry, but retry job never fired (jobId collision)
// After fix: Step properly retries up to 3 times with 1-second intervals
```
---
## Checklist
Please ensure the following before requesting a review:
- [ ] I have added a **changeset** for this PR
- Every non-breaking change should be marked as a **patch**
- To add a changeset, run `yarn changeset` and follow the prompts
- [ ] The changes are covered by relevant **tests**
- [ ] I have verified the code works as intended locally
- [ ] I have linked the related issue(s) if applicable
---
## Additional Context
-
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
RESOLVES CORE-1206
**What**
Instead of removing cleaner repeatable job and risk to remove it while other instances are still up, we always create it, since the id of a job is unique and we give one to the job, if already present it wont get added
RESOLVES CORE-1163
RESOLVES CORE-1164
**What**
### Add support for non auto retryable steps.
When marking a step with `maxRetries`, when it will fail it will be marked as temporary failure and then retry itself automatically. Thats the default behaviour, if you now add `autoRetry: false`, when the step will fail it will be marked as temporary failure but not retry automatically. you can now call the workflow engine run to resume the workflow from the failing step to be retried.
### Add support for `maxAwaitingRetries`
When setting `retyIntervalAwaiting` a step that is awaiting will be retried after the specified interval without maximun retry. Now you can set `maxAwaitingRetries` to force a maximum awaiting retry number
### Add support to manually retry an awaiting step
In some scenario, either a machine dies while a step is executing or a step is taking longer than expected, you can now call `retryStep` on the workflow engine to force a retry of the step that is supposedly stucked
* chore(): only execute race execution checks for async workflows
* chore(): workflow redis publish only for async flows
* Create cyan-gorillas-poke.md
* chore(): workflow redis publish only for async flows
* fix negative check
---------
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
* fix(orchestration): Use the step definition max retries on set step failure
* Create sweet-turkeys-wait.md
* allow to force permanent failure
* update changeset
FIXES CLO-524
**What**
Add hidden stepDefinition object as part of the step argument and ensure the runAsStep handlers rely on the latest definition when config is being used on the returned step in order to ensure async configuration propagation and nested configuration
**What**
Currently, runAsStep keep reference of the workflow context that is being run as step, except that the step is composed for the current workflow composition and not the workflow being run as a step. Therefore, the context are currently miss matched leading to wrong configuration being used in case of async workflows.
**BUG**
This fix allow the runAsStep to use the current composition context to configure the step for the sub workflow to be run
**BUG BREAKING**
fix the step config wrongly used to wrap async step handlers. Now steps configured async through .config that returns a new step response will indeed marked itself as success without the need for background execution or calling setStepSuccess (as it was expected originally)
**FEATURE**
This pr also add support for cancelling running transaction, the transaction will be marked as being cancelled, once the current step finished, it will cancel the transaction to start compensating all previous steps including itself
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
**What**
Make sure there is no open handles left and that the shutdown function are properly called. Refactor and improve the medusa test runner. Make sure all modules instances are released and cleaned up
**NOTE:**
On a separate PR we can continue the investigation for the memory growing over time while the tests execute
**What**
Now that all events management are fixed in the workflows life cycle, the run as step needs to leverage the workflow engine if present (which should always be the case for async workflows) in order to ensure the continuation and the ability to mark parent step in parent workflow as success or failure
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
* fix(workflow-engine-*): Prevent passing shared context reference
* fix(workflow-engine-*): Prevent passing shared context reference
* prevent tests from hanging
* fix event handling
* add integration tests
* use interval for scheduled in tests
* skip tests for now
* Create silent-glasses-enjoy.md
* fix cancel
* changeset
* push multiple aliases
* test multiple field alias
* increase wait time to index on test
---------
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
**What**
Currently only cron pattern are supported by scheduled jobs, this can lead to issue. for example you set the pattern to execute every hours at minute 0 and second 0 (as it is expected to execute at exactly this constraint) but due to the moment it gets executed we our out of the second 0 then the job wont get executed until the next scheduled cron table execution.
With this pr we introduce the `interval` configuration which allows you the specify a delay between execution in ms (e.g every minute -> 60 * 1000 ms) which ensure that once a job is executed another one is scheduled for a minute later.
**Usage**
```ts
// jobs/job-1.ts
const thirtySeconds = 30 * 1000
export const config = {
name: "job-1",
schedule: {
interval: thirtySeconds
},
}
```
What:
* Old deployments have repeatable jobs registered in a wrong queue. When the `server` instance picks that job, the workflow doesn't exist, it calls to remove the job, which then removes the job from the new queue.
* This PR cleans up any repeatable job from the queue that is exclusive to handle workflows.
**What**
Currently, the workflow engine redis does not make any distinction between worker modes, when starting as server, the engine listen to the queue which contains everything and try to execute the corresponding workflow which does not exists since job workflows are not loaded in server mode. Now, a dedicated queue is created for jobs and the worker is only started if the instance is not in server mode. In order to clean up the old queue, if the old queue trigger a scheduled job then it gets removed from the queue since it will get re added to the new queue by the new worker instances
FIXES FRMW-2878
**What**
Currently, the `one-to-one` unique constraints does not account for deleted record. This prevents from inserting a new record wth the same fk if another one is deleted.
**Caveat**
`hasOne` with FK option is meant to be a special case, for example a many to one - one to many without defining the other side of the relation. In that case we don't handle this behaviour and keep it as it is
RESOLVES FRMW-2832
RESOLVES FRMW-2833
**What**
Migrate workflow engines to DML. Alos includes and update to the linkable generation which now takes into account id and primary keys to generate the linkable instead of only primary keys
What:
- copy data before saving checkpoint
- removed unused data format function
- properly handle registerStepFailure to not throw
- emit onFinish event even when execution failed