chore(): Prevent sub workflow events release early + redis unlink (#11641)

**What**
- Prevent event release when a workflow is run as step and finish
- Use `unlink` instead of `del` when removing keys from redist to push the execution to async thread
This commit is contained in:
Adrien de Peretti
2025-02-27 11:33:30 +01:00
committed by GitHub
parent ca6a15717d
commit c250de7919
7 changed files with 26 additions and 14 deletions

View File

@@ -0,0 +1,9 @@
---
"@medusajs/cache-redis": patch
"@medusajs/event-bus-redis": patch
"@medusajs/workflow-engine-redis": patch
"@medusajs/locking-redis": patch
"@medusajs/workflows-sdk": patch
---
chore(): Prevent sub workflow events release early + redis unlink

View File

@@ -93,7 +93,9 @@ function createContextualWorkflowRunner<
const { eventGroupId, parentStepIdempotencyKey } = context
attachOnFinishReleaseEvents(events, flow, { logOnError })
if (!parentStepIdempotencyKey) {
attachOnFinishReleaseEvents(events, flow, { logOnError })
}
const flowMetadata = {
eventGroupId,

View File

@@ -66,7 +66,7 @@ class RedisCacheService implements ICacheService {
return JSON.parse(cached)
}
} catch (err) {
await this.redis.del(cacheKey)
await this.redis.unlink(cacheKey)
}
return null
}
@@ -92,7 +92,7 @@ class RedisCacheService implements ICacheService {
if (keys.length > 0) {
const deletePipeline = this.redis.pipeline()
for (const key of keys) {
deletePipeline.del(key)
deletePipeline.unlink(key)
}
await deletePipeline.exec()

View File

@@ -27,6 +27,7 @@ const redisMock = {
lrange: () => jest.fn(),
disconnect: () => jest.fn(),
expire: () => jest.fn(),
unlink: () => jest.fn(),
} as unknown as Redis
const simpleModuleOptions = { redisUrl: "test-url" }
@@ -63,7 +64,7 @@ describe("RedisEventBusService", () => {
{
connection: expect.any(Object),
prefix: "RedisEventBusService",
autorun: false
autorun: false,
}
)
})
@@ -269,7 +270,7 @@ describe("RedisEventBusService", () => {
},
]
redis.del = jest.fn()
redis.unlink = jest.fn()
await eventBus.emit(events, options)
@@ -277,7 +278,7 @@ describe("RedisEventBusService", () => {
// Expect 2 pushes to redis as there are 2 groups of events to push
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(redis.rpush).toHaveBeenCalledTimes(2)
expect(redis.del).not.toHaveBeenCalled()
expect(redis.unlink).not.toHaveBeenCalled()
const [testGroup1Event] = (eventBus as any).buildEvents(
[events[0]],
@@ -314,12 +315,12 @@ describe("RedisEventBusService", () => {
expect(queue.addBulk).toHaveBeenCalledTimes(1)
expect(queue.addBulk).toHaveBeenCalledWith([testGroup1Event])
expect(redis.del).toHaveBeenCalledTimes(1)
expect(redis.del).toHaveBeenCalledWith("staging:test-group-1")
expect(redis.unlink).toHaveBeenCalledTimes(1)
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-1")
queue = (eventBus as any).queue_
queue.addBulk = jest.fn()
redis.del = jest.fn()
redis.unlink = jest.fn()
await eventBus.releaseGroupedEvents("test-group-2")
@@ -328,8 +329,8 @@ describe("RedisEventBusService", () => {
testGroup2Event,
testGroup2Event2,
])
expect(redis.del).toHaveBeenCalledTimes(1)
expect(redis.del).toHaveBeenCalledWith("staging:test-group-2")
expect(redis.unlink).toHaveBeenCalledTimes(1)
expect(redis.unlink).toHaveBeenCalledWith("staging:test-group-2")
})
})
})

View File

@@ -223,7 +223,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
return
}
await this.eventBusRedisConnection_.del(`staging:${eventGroupId}`)
await this.eventBusRedisConnection_.unlink(`staging:${eventGroupId}`)
}
/**

View File

@@ -258,7 +258,7 @@ export class RedisLockingProvider implements ILockingProvider {
const currentOwner = currentOwners?.[idx]?.[1]
if (currentOwner === ownerId) {
deletePipeline.del(key)
deletePipeline.unlink(key)
}
})

View File

@@ -33,7 +33,7 @@ async function deleteKeysByPattern(pattern) {
for await (const keys of stream) {
if (keys.length) {
const pipeline = redis.pipeline()
keys.forEach((key) => pipeline.del(key))
keys.forEach((key) => pipeline.unlink(key))
await pipeline.exec()
}
}