## Summary
**What** — What changes are introduced in this PR?
This PR fixes a bug where async workflow steps with retry intervals would get stuck after the first retry attempt due to Bull queue jobId collisions preventing retry jobs from executing.
**Why** — Why are these changes relevant or necessary?
Workflows using async steps with retry configurations (e.g., `retryInterval: 1`, `maxRetries: 5`) would fail once, schedule a retry, but the retry job would never execute, causing workflows to hang indefinitely.
**How** — How have these changes been implemented?
**Root Cause:** Bull queue was rejecting retry jobs because they had identical jobIds to the async execution jobs that already completed. Both used the format: `retry:workflow:transaction:step_id:attempts`.
**Solution:** Modified `getJobId()` in `workflow-orchestrator-storage.ts` to append a `:retry` suffix when `interval > 0`, creating unique jobIds:
- Async execution (interval=0): `retry:...:step_id:1`
- Retry scheduling (interval>0): `retry:...:step_id:1:retry`
Updated methods: `getJobId()`, `scheduleRetry()`, `removeJob()`, and `clearRetry()` to pass and handle the interval parameter.
**Testing** — How have these changes been tested, or how can the reviewer test the feature?
Added integration test `retry-interval.spec.ts` that verifies:
1. Step with `retryInterval: 1` and `maxRetries: 3` executes 3 times
2. Retry intervals are approximately 1 second between attempts
3. Workflow completes successfully after retries
4. Uses proper async workflow completion pattern with `subscribe()` and `onFinish` event
---
## Examples
```ts
// Example workflow step that would previously get stuck
export const testRetryStep = createStep(
{
name: "test-retry-step",
async: true,
retryInterval: 1, // 1 second retry interval
maxRetries: 3,
},
async (input: any) => {
// Simulate failure on first 2 attempts
if (attempts < 3) {
throw new Error("Temporary failure - will retry")
}
return { success: true }
}
)
// Before fix: Step would fail once, schedule retry, but retry job never fired (jobId collision)
// After fix: Step properly retries up to 3 times with 1-second intervals
```
---
## Checklist
Please ensure the following before requesting a review:
- [ ] I have added a **changeset** for this PR
- Every non-breaking change should be marked as a **patch**
- To add a changeset, run `yarn changeset` and follow the prompts
- [ ] The changes are covered by relevant **tests**
- [ ] I have verified the code works as intended locally
- [ ] I have linked the related issue(s) if applicable
---
## Additional Context
-
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
**What**
It seems that for some reason the weak map fail in some scenario, but after investigation, the usage of map would not have a bad impact as it will be released after the Distributed transaction if finished. Therefore, falling back to Map instead
FIXES https://github.com/medusajs/medusa/issues/13654
NOTE: Waiting for the user feedback as he is also using node 18. We also use the exact same pattern in all our core flows without issues 🤔
RESOLVES CORE-1171
**What**
- Reduced async overhead for objects with mixed sync/async properties
- Lower memory pressure from eliminated promise allocations
- Faster primitive value processing with early returns
- Better concurrency through selective batching of truly async operations
- Event loop friendly behavior preventing artificial delays
- Reduced memory allocation from eliminated duplicate processing
- Decreased GC pressure from WeakMap-based caching instead of map
**note**
Now, `resolveValue`always treat every resoltion as sync operation unless it is not, meaning we do not create promise overhead when not necessary and only when actually treating with promises
RESOLVES CORE-1163
RESOLVES CORE-1164
**What**
### Add support for non auto retryable steps.
When marking a step with `maxRetries`, when it will fail it will be marked as temporary failure and then retry itself automatically. Thats the default behaviour, if you now add `autoRetry: false`, when the step will fail it will be marked as temporary failure but not retry automatically. you can now call the workflow engine run to resume the workflow from the failing step to be retried.
### Add support for `maxAwaitingRetries`
When setting `retyIntervalAwaiting` a step that is awaiting will be retried after the specified interval without maximun retry. Now you can set `maxAwaitingRetries` to force a maximum awaiting retry number
### Add support to manually retry an awaiting step
In some scenario, either a machine dies while a step is executing or a step is taking longer than expected, you can now call `retryStep` on the workflow engine to force a retry of the step that is supposedly stucked
What:
- Added query planning to the Remote Joiner, enabling phased and parallel execution of data aggregation.
- Replaced object deletes with non-enumerable property hiding to improve performance.
* fix(orchestration): Use the step definition max retries on set step failure
* Create sweet-turkeys-wait.md
* allow to force permanent failure
* update changeset
**What**
Currently, when cancelling async workflows, the step will get rescheduled while the current worker try to continue the execution leading to concurrency failure on compensation. This pr prevent the current worker from executing while an async step gets rescheduled
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
**What**
Currently, runAsStep keep reference of the workflow context that is being run as step, except that the step is composed for the current workflow composition and not the workflow being run as a step. Therefore, the context are currently miss matched leading to wrong configuration being used in case of async workflows.
**BUG**
This fix allow the runAsStep to use the current composition context to configure the step for the sub workflow to be run
**BUG BREAKING**
fix the step config wrongly used to wrap async step handlers. Now steps configured async through .config that returns a new step response will indeed marked itself as success without the need for background execution or calling setStepSuccess (as it was expected originally)
**FEATURE**
This pr also add support for cancelling running transaction, the transaction will be marked as being cancelled, once the current step finished, it will cancel the transaction to start compensating all previous steps including itself
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
What:
- Added step config `skipOnPermanentFailure`. Skip all the next steps when the current step fails. If a string is used, the workflow will resume from the given step.
- Fix `continueOnPermanentFailure` to continue the execution of the flow when a step fails.
```ts
createWorkflow("some-workflow", () => {
errorStep().config({
skipOnPermanentFailure: true,
})
nextStep1() // skipped
nextStep2() // skipped
})
createWorkflow("some-workflow", () => {
errorStep().config({
skipOnPermanentFailure: "resume-from-here",
});
nextStep1(); // skipped
nextStep2(); // skipped
nextStep3().config({ name: "resume-from-here" }); // executed
nextStep4(); // executed
});
```
**What**
Now that all events management are fixed in the workflows life cycle, the run as step needs to leverage the workflow engine if present (which should always be the case for async workflows) in order to ensure the continuation and the ability to mark parent step in parent workflow as success or failure
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
* fix(workflow-engine-*): Prevent passing shared context reference
* fix(workflow-engine-*): Prevent passing shared context reference
* prevent tests from hanging
* fix event handling
* add integration tests
* use interval for scheduled in tests
* skip tests for now
* Create silent-glasses-enjoy.md
* fix cancel
* changeset
* push multiple aliases
* test multiple field alias
* increase wait time to index on test
---------
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
**What**
Currently only cron pattern are supported by scheduled jobs, this can lead to issue. for example you set the pattern to execute every hours at minute 0 and second 0 (as it is expected to execute at exactly this constraint) but due to the moment it gets executed we our out of the second 0 then the job wont get executed until the next scheduled cron table execution.
With this pr we introduce the `interval` configuration which allows you the specify a delay between execution in ms (e.g every minute -> 60 * 1000 ms) which ensure that once a job is executed another one is scheduled for a minute later.
**Usage**
```ts
// jobs/job-1.ts
const thirtySeconds = 30 * 1000
export const config = {
name: "job-1",
schedule: {
interval: thirtySeconds
},
}
```
What:
- `query.index` helper. It queries the index module, and aggregate the rest of requested fields/relations if needed like `query.graph`.
Not covered in this PR:
- Hydrate only sub entities returned by the query. Example: 1 out of 5 variants have returned, it should only hydrate the data of the single entity, currently it will merge all the variants of the product.
- Generate types of indexed data
example:
```ts
const query = container.resolve(ContainerRegistrationKeys.QUERY)
await query.index({
entity: "product",
fields: [
"id",
"description",
"status",
"variants.sku",
"variants.barcode",
"variants.material",
"variants.options.value",
"variants.prices.amount",
"variants.prices.currency_code",
"variants.inventory_items.inventory.sku",
"variants.inventory_items.inventory.description",
],
filters: {
"variants.sku": { $like: "%-1" },
"variants.prices.amount": { $gt: 30 },
},
pagination: {
order: {
"variants.prices.amount": "DESC",
},
},
})
```
This query return all products where at least one variant has the title ending in `-1` and at least one price bigger than `30`.
The Index Module only hold the data used to paginate and filter, and the returned object is:
```json
{
"id": "prod_01JKEAM2GJZ14K64R0DHK0JE72",
"title": null,
"variants": [
{
"id": "variant_01JKEAM2HC89GWS95F6GF9C6YA",
"sku": "extra-variant-1",
"prices": [
{
"id": "price_01JKEAM2JADEWWX72F8QDP6QXT",
"amount": 80,
"currency_code": "USD"
}
]
}
]
}
```
All the rest of the fields will be hydrated from their respective modules, and the final result will be:
```json
{
"id": "prod_01JKEAY2RJTF8TW9A23KTGY1GD",
"description": "extra description",
"status": "draft",
"variants": [
{
"sku": "extra-variant-1",
"barcode": null,
"material": null,
"id": "variant_01JKEAY2S945CRZ6X4QZJ7GVBJ",
"options": [
{
"value": "Red"
}
],
"prices": [
{
"amount": 20,
"currency_code": "CAD",
"id": "price_01JKEAY2T2EEYSWZHPGG11B7W7"
},
{
"amount": 80,
"currency_code": "USD",
"id": "price_01JKEAY2T2NJK2E5468RK84CAR"
}
],
"inventory_items": [
{
"variant_id": "variant_01JKEAY2S945CRZ6X4QZJ7GVBJ",
"inventory_item_id": "iitem_01JKEAY2SNY2AWEHPZN0DDXVW6",
"inventory": {
"sku": "extra-variant-1",
"description": "extra variant 1",
"id": "iitem_01JKEAY2SNY2AWEHPZN0DDXVW6"
}
}
]
}
]
}
```
Co-authored-by: Adrien de Peretti <25098370+adrien2p@users.noreply.github.com>
What:
* Add support for aggregating data into existing resultset.
* `query.graph` new option `initialData` containing the resultset to be hydrated
* It fetches data where the requested fields are not present and merge with the existing resultset
FIXES FRMW-2852
**What**
A workflow distributed transaction expect any response and error to be serializable. When it is not the case, the distributed transaction might fail during the save checkpoint that occurs for async steps. This can lead to unexpected behaviour.
With this pr, we introduce a way to handle non serialazable object in a more sustainable manner, this means the following:
- If a workflow throw any non serialazable error (e.g AWS error that contains full IncomingMessage object that related to network communication, think of req/res) then we identify that this object is not serialzable and we clean up the object to make it serializable without loosing the main information, add a new error to the workflow to informed of this issue and can be handled by the user.
- If a response is not serializable (which should not happen at this point because it is handled before by the value resolver), in that case, we wont be able to reuse that response to continue the workflow which means that the workflow is in a non runnable state. In that case we throw a specific error stating that a non serializable context is being provided
**second what**
This pr refactor the `runAsStep` to add better support for workflow cancelation, especially async ones
What:
- copy data before saving checkpoint
- removed unused data format function
- properly handle registerStepFailure to not throw
- emit onFinish event even when execution failed
What
- Store result of cart-completion workflow for three days by default
- This enables the built-in idempotency mechanism to kick-in, provided the same transaction ID is used on workflow executions
- Return order from cart-completion workflow if the cart has already been completed
- In case transaction ID is not used on workflow executions, we still only want to complete a cart once