feat(locking): Locking module (#9524)

**What**
- Locking Module to manage concurrency
- Default `in-memory` provider
This commit is contained in:
Carlos R. L. Rodrigues
2024-10-11 13:30:06 -03:00
committed by GitHub
parent 5c9e289c4d
commit c8b375ae2d
28 changed files with 806 additions and 39 deletions

View File

@@ -0,0 +1,11 @@
import { Module, Modules } from "@medusajs/framework/utils"
import { LockingModuleService } from "@services"
import loadProviders from "./loaders/providers"
export default Module(Modules.LOCKING, {
service: LockingModuleService,
loaders: [loadProviders],
})
// Module options types
export { LockingModuleOptions } from "./types"

View File

@@ -0,0 +1,85 @@
import { moduleProviderLoader } from "@medusajs/framework/modules-sdk"
import {
LoaderOptions,
ModuleProvider,
ModulesSdkTypes,
} from "@medusajs/framework/types"
import { ContainerRegistrationKeys } from "@medusajs/framework/utils"
import { LockingProviderService } from "@services"
import {
LockingDefaultProvider,
LockingIdentifiersRegistrationName,
LockingProviderRegistrationPrefix,
} from "@types"
import { Lifetime, asFunction, asValue } from "awilix"
import { InMemoryLockingProvider } from "../providers/in-memory"
const registrationFn = async (klass, container, pluginOptions) => {
const key = LockingProviderService.getRegistrationIdentifier(klass)
container.register({
[LockingProviderRegistrationPrefix + key]: asFunction(
(cradle) => new klass(cradle, pluginOptions.options),
{
lifetime: klass.LIFE_TIME || Lifetime.SINGLETON,
}
),
})
container.registerAdd(LockingIdentifiersRegistrationName, asValue(key))
}
export default async ({
container,
options,
}: LoaderOptions<
(
| ModulesSdkTypes.ModuleServiceInitializeOptions
| ModulesSdkTypes.ModuleServiceInitializeCustomDataLayerOptions
) & { providers: ModuleProvider[] }
>): Promise<void> => {
const logger = container.resolve(ContainerRegistrationKeys.LOGGER)
container.registerAdd(LockingIdentifiersRegistrationName, asValue(undefined))
// InMemoryLockingProvider - default provider
container.register({
[LockingProviderRegistrationPrefix + InMemoryLockingProvider.identifier]:
asFunction((cradle) => new InMemoryLockingProvider(), {
lifetime: Lifetime.SINGLETON,
}),
})
container.registerAdd(
LockingIdentifiersRegistrationName,
asValue(InMemoryLockingProvider.identifier)
)
container.register(
LockingDefaultProvider,
asValue(InMemoryLockingProvider.identifier)
)
// Load other providers
await moduleProviderLoader({
container,
providers: options?.providers || [],
registerServiceFn: registrationFn,
})
const isSingleProvider = options?.providers?.length === 1
let hasDefaultProvider = false
for (const provider of options?.providers || []) {
if (provider.is_default || isSingleProvider) {
if (provider.is_default) {
hasDefaultProvider = true
}
container.register(LockingDefaultProvider, asValue(provider.id))
}
}
if (!hasDefaultProvider) {
logger.warn(
`No default locking provider explicit defined. Using "${container.resolve(
LockingDefaultProvider
)}" as default.`
)
}
}

View File

@@ -0,0 +1,176 @@
import { ILockingProvider } from "@medusajs/framework/types"
import { isDefined } from "@medusajs/framework/utils"
type LockInfo = {
ownerId: string | null
expiration: number | null
currentPromise?: ResolvablePromise
}
type ResolvablePromise = {
promise: Promise<any>
resolve: () => void
}
export class InMemoryLockingProvider implements ILockingProvider {
static identifier = "in-memory"
private locks: Map<string, LockInfo> = new Map()
constructor() {}
private getPromise(): ResolvablePromise {
let resolve: any
const pro = new Promise((ok) => {
resolve = ok
})
return {
promise: pro,
resolve,
}
}
async execute<T>(
keys: string | string[],
job: () => Promise<T>,
args?: {
timeout?: number
}
): Promise<T> {
keys = Array.isArray(keys) ? keys : [keys]
const timeoutSeconds = args?.timeout ?? 5
const promises: Promise<any>[] = []
if (timeoutSeconds > 0) {
promises.push(this.getTimeout(timeoutSeconds))
}
promises.push(
this.acquire(keys, {
awaitQueue: true,
})
)
await Promise.race(promises).catch(async (err) => {
await this.release(keys)
})
try {
return await job()
} finally {
await this.release(keys)
}
}
async acquire(
keys: string | string[],
args?: {
ownerId?: string | null
expire?: number
awaitQueue?: boolean
}
): Promise<void> {
keys = Array.isArray(keys) ? keys : [keys]
const { ownerId, expire } = args ?? {}
for (const key of keys) {
const lock = this.locks.get(key)
const now = Date.now()
if (!lock) {
this.locks.set(key, {
ownerId: ownerId ?? null,
expiration: expire ? now + expire * 1000 : null,
currentPromise: this.getPromise(),
})
continue
}
if (lock.expiration && lock.expiration <= now) {
lock.currentPromise?.resolve?.()
this.locks.set(key, {
ownerId: ownerId ?? null,
expiration: expire ? now + expire * 1000 : null,
currentPromise: this.getPromise(),
})
continue
}
if (lock.ownerId === ownerId) {
if (expire) {
lock.expiration = now + expire * 1000
this.locks.set(key, lock)
}
continue
}
if (lock.currentPromise && args?.awaitQueue) {
await lock.currentPromise.promise
return this.acquire(keys, args)
}
throw new Error(`"${key}" is already locked.`)
}
}
async release(
keys: string | string[],
args?: {
ownerId?: string | null
}
): Promise<boolean> {
const { ownerId } = args ?? {}
keys = Array.isArray(keys) ? keys : [keys]
let success = true
for (const key of keys) {
const lock = this.locks.get(key)
if (!lock) {
success = false
continue
}
if (isDefined(ownerId) && lock.ownerId !== ownerId) {
success = false
continue
}
lock.currentPromise?.resolve?.()
this.locks.delete(key)
}
return success
}
async releaseAll(args?: { ownerId?: string | null }): Promise<void> {
const { ownerId } = args ?? {}
if (!isDefined(ownerId)) {
for (const [key, lock] of this.locks.entries()) {
lock.currentPromise?.resolve?.()
this.locks.delete(key)
}
} else {
for (const [key, lock] of this.locks.entries()) {
if (lock.ownerId === ownerId) {
lock.currentPromise?.resolve?.()
this.locks.delete(key)
}
}
}
}
private async getTimeout(seconds: number): Promise<void> {
return new Promise((_, reject) => {
setTimeout(() => {
reject(new Error("Timed-out acquiring lock."))
}, seconds * 1000)
})
}
}

View File

@@ -0,0 +1,2 @@
export { default as LockingModuleService } from "./locking-module"
export { default as LockingProviderService } from "./locking-provider"

View File

@@ -0,0 +1,90 @@
import {
Context,
ILockingModule,
InternalModuleDeclaration,
} from "@medusajs/types"
import { EntityManager } from "@mikro-orm/core"
import { LockingDefaultProvider } from "@types"
import LockingProviderService from "./locking-provider"
type InjectedDependencies = {
manager: EntityManager
lockingProviderService: LockingProviderService
[LockingDefaultProvider]: string
}
export default class LockingModuleService implements ILockingModule {
protected manager: EntityManager
protected providerService_: LockingProviderService
protected defaultProviderId: string
constructor(
container: InjectedDependencies,
protected readonly moduleDeclaration: InternalModuleDeclaration
) {
this.manager = container.manager
this.providerService_ = container.lockingProviderService
this.defaultProviderId = container[LockingDefaultProvider]
}
async execute<T>(
keys: string | string[],
job: () => Promise<T>,
args?: {
timeout?: number
provider?: string
},
sharedContext: Context = {}
): Promise<T> {
const providerId = args?.provider ?? this.defaultProviderId
const provider =
this.providerService_.retrieveProviderRegistration(providerId)
return provider.execute(keys, job, args, sharedContext)
}
async acquire(
keys: string | string[],
args?: {
ownerId?: string | null
expire?: number
provider?: string
},
sharedContext: Context = {}
): Promise<void> {
const providerId = args?.provider ?? this.defaultProviderId
const provider =
this.providerService_.retrieveProviderRegistration(providerId)
await provider.acquire(keys, args, sharedContext)
}
async release(
keys: string | string[],
args?: {
ownerId?: string | null
provider?: string
},
sharedContext: Context = {}
): Promise<boolean> {
const providerId = args?.provider ?? this.defaultProviderId
const provider =
this.providerService_.retrieveProviderRegistration(providerId)
return await provider.release(keys, args, sharedContext)
}
async releaseAll(
args?: {
ownerId?: string | null
provider?: string
},
sharedContext: Context = {}
): Promise<void> {
const providerId = args?.provider ?? this.defaultProviderId
const provider =
this.providerService_.retrieveProviderRegistration(providerId)
return await provider.releaseAll(args, sharedContext)
}
}

View File

@@ -0,0 +1,40 @@
import { Constructor, ILockingProvider } from "@medusajs/framework/types"
import { MedusaError } from "@medusajs/framework/utils"
import { LockingProviderRegistrationPrefix } from "../types"
type InjectedDependencies = {
[key: `lp_${string}`]: ILockingProvider
}
export default class LockingProviderService {
protected __container__: InjectedDependencies
constructor(container: InjectedDependencies) {
this.__container__ = container
}
static getRegistrationIdentifier(
providerClass: Constructor<ILockingProvider>
) {
if (!(providerClass as any).identifier) {
throw new MedusaError(
MedusaError.Types.INVALID_ARGUMENT,
`Trying to register a locking provider without an identifier.`
)
}
return `${(providerClass as any).identifier}`
}
public retrieveProviderRegistration(providerId: string): ILockingProvider {
try {
return this.__container__[
`${LockingProviderRegistrationPrefix}${providerId}`
]
} catch (err) {
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`Could not find a locking provider with id: ${providerId}`
)
}
}
}

View File

@@ -0,0 +1,33 @@
import {
ModuleProviderExports,
ModuleServiceInitializeOptions,
} from "@medusajs/framework/types"
export const LockingDefaultProvider = "default_provider"
export const LockingIdentifiersRegistrationName = "locking_providers_identifier"
export const LockingProviderRegistrationPrefix = "lp_"
export type LockingModuleOptions = Partial<ModuleServiceInitializeOptions> & {
/**
* Providers to be registered
*/
providers?: {
/**
* The module provider to be registered
*/
resolve: string | ModuleProviderExports
/**
* If the provider is the default
*/
is_default?: boolean
/**
* The id of the provider
*/
id: string
/**
* key value pair of the configuration to be passed to the provider constructor
*/
options?: Record<string, unknown>
}[]
}