Files
Adrien de Peretti 516f5a3896 fix: workflow async concurrency (#13769)
* executeAsync

* || 1

* wip

* stepId

* stepId

* wip

* wip

* continue versioning management changes

* fix and improve concurrency

* update in memory engine

* remove duplicated test

* fix script

* Create weak-drinks-confess.md

* fixes

* fix

* fix

* continuation

* centralize merge checkepoint

* centralize merge checkpoint

* fix locking

* rm only

* Continue improvements and fixes

* fixes

* fixes

* hasAwaiting will be recomputed

* fix orchestrator engine

* bump version on async parallel steps only

* mark as delivered fix

* changeset

* check partitions

* avoid saving when having parent step

* cart test

---------

Co-authored-by: Carlos R. L. Rodrigues <rodrigolr@gmail.com>
Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
Co-authored-by: Oli Juhl <59018053+olivermrbl@users.noreply.github.com>
2025-10-20 15:29:19 +02:00

46 lines
1.0 KiB
TypeScript

import * as process from "process"
const DB_HOST = process.env.DB_HOST ?? "localhost"
const DB_USERNAME = process.env.DB_USERNAME ?? ""
const DB_PASSWORD = process.env.DB_PASSWORD
const DB_NAME = process.env.DB_TEMP_NAME
export const DB_URL = `postgres://${DB_USERNAME}${
DB_PASSWORD ? `:${DB_PASSWORD}` : ""
}@${DB_HOST}/${DB_NAME}`
const Redis = require("ioredis")
const redisUrl = process.env.REDIS_URL || "redis://localhost:6379"
const redis = new Redis(redisUrl)
interface TestDatabase {
clearTables(): Promise<void>
}
export const TestDatabase: TestDatabase = {
clearTables: async () => {
await cleanRedis()
},
}
async function deleteKeysByPattern(pattern) {
const stream = redis.scanStream({
match: pattern,
count: 100,
})
const pipeline = redis.pipeline()
for await (const keys of stream) {
if (keys.length) {
keys.forEach((key) => pipeline.unlink(key))
}
}
await pipeline.exec()
}
async function cleanRedis() {
await deleteKeysByPattern("bull:*")
await deleteKeysByPattern("dtrx:*")
}