feat(index): full sync operations (#11178)

Closes: FRMW-2892, FRMW-2893

**What**
Wired up the building block that we merged previously in order to manage data synchronization. The flow is as follow
- On application start
  - Build schema object representation from configuration
  - Check configuration changes
    - if new entities configured
      - Data synchronizer initialize orchestrator and start sync
        - for each entity
          - acquire lock
          - mark existing data as staled
          - sync all data by batch
          - marked them not staled anymore
          - acknowledge each processed batch and renew lock
          - update metadata with last synced cursor for entity X
          - release lock
      - remove all remaining staled data
    - if any entities removed from last configuration
      - remove the index data and relations

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2025-02-05 17:49:18 +01:00
committed by GitHub
parent 60f46e07fd
commit a33aebd895
35 changed files with 1677 additions and 727 deletions
@@ -0,0 +1,356 @@
import {
CommonEvents,
ContainerRegistrationKeys,
groupBy,
Modules,
promiseAll,
} from "@medusajs/framework/utils"
import {
Event,
ILockingModule,
IndexTypes,
Logger,
ModulesSdkTypes,
RemoteQueryFunction,
SchemaObjectEntityRepresentation,
} from "@medusajs/types"
import { IndexMetadataStatus, Orchestrator } from "@utils"
export class DataSynchronizer {
#container: Record<string, any>
#isReady: boolean = false
#schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation
#storageProvider: IndexTypes.StorageProvider
#orchestrator!: Orchestrator
get #query() {
return this.#container[
ContainerRegistrationKeys.QUERY
] as RemoteQueryFunction
}
get #locking() {
return this.#container[Modules.LOCKING] as ILockingModule
}
get #indexMetadataService(): ModulesSdkTypes.IMedusaInternalService<any> {
return this.#container.indexMetadataService
}
get #indexSyncService(): ModulesSdkTypes.IMedusaInternalService<any> {
return this.#container.indexSyncService
}
get #indexDataService(): ModulesSdkTypes.IMedusaInternalService<any> {
return this.#container.indexDataService
}
// @ts-ignore
get #indexRelationService(): ModulesSdkTypes.IMedusaInternalService<any> {
return this.#container.indexRelationService
}
get #logger(): Logger {
try {
return this.#container.logger
} catch (err) {
return console as unknown as Logger
}
}
constructor(container: Record<string, any>) {
this.#container = container
}
#isReadyOrThrow() {
if (!this.#isReady) {
throw new Error(
"DataSynchronizer is not ready. Call onApplicationStart first."
)
}
}
onApplicationStart({
schemaObjectRepresentation,
storageProvider,
}: {
lockDuration?: number
schemaObjectRepresentation: IndexTypes.SchemaObjectRepresentation
storageProvider: IndexTypes.StorageProvider
}) {
this.#storageProvider = storageProvider
this.#schemaObjectRepresentation = schemaObjectRepresentation
this.#isReady = true
}
async syncEntities(
entities: {
entity: string
fields: string
fields_hash: string
}[],
lockDuration: number = 1000 * 60 * 5
) {
this.#isReadyOrThrow()
const entitiesToSync = entities.map((entity) => entity.entity)
this.#orchestrator = new Orchestrator(this.#locking, entitiesToSync, {
lockDuration,
})
await this.#orchestrator.process(this.#taskRunner.bind(this))
}
async removeEntities(entities: string[], staleOnly: boolean = false) {
this.#isReadyOrThrow()
const staleCondition = staleOnly ? { staled_at: { $ne: null } } : {}
const dataToDelete = await this.#indexDataService.list({
...staleCondition,
name: entities,
})
const toDeleteByEntity = groupBy(dataToDelete, "name")
for (const entity of toDeleteByEntity.keys()) {
const records = toDeleteByEntity.get(entity)
const ids = records?.map(
(record: { data: { id: string } }) => record.data.id
)
if (!ids?.length) {
continue
}
if (this.#schemaObjectRepresentation[entity]) {
// Here we assume that some data have been deleted from from the source and we are cleaning since they are still staled in the index and we remove them from the index
// TODO: expand storage provider interface
await (this.#storageProvider as any).onDelete({
entity,
data: ids,
schemaEntityObjectRepresentation:
this.#schemaObjectRepresentation[entity],
})
} else {
// Here we assume that the entity is not indexed anymore as it is not part of the schema object representation and we are cleaning the index
await promiseAll([
this.#indexDataService.delete({
selector: {
name: entity,
},
}),
this.#indexRelationService.delete({
selector: {
$or: [{ parent_id: entity }, { child_id: entity }],
},
}),
])
}
}
}
async #updatedStatus(entity: string, status: IndexMetadataStatus) {
await this.#indexMetadataService.update({
data: {
status,
},
selector: {
entity,
},
})
}
async #taskRunner(entity: string) {
const [[lastCursor]] = await promiseAll([
this.#indexSyncService.list(
{
entity,
},
{
select: ["last_key"],
}
),
this.#updatedStatus(entity, IndexMetadataStatus.PROCESSING),
this.#indexDataService.update({
data: {
staled_at: new Date(),
},
selector: {
name: entity,
},
}),
])
const finalAcknoledgement = await this.syncEntity({
entityName: entity,
pagination: {
cursor: lastCursor?.last_key,
},
ack: async (ack) => {
const promises: Promise<any>[] = []
if (ack.lastCursor) {
promises.push(
this.#indexSyncService.update({
data: {
last_key: ack.lastCursor,
},
selector: {
entity,
},
})
)
if (!ack.done && !ack.err) {
promises.push(this.#orchestrator.renewLock(entity))
}
}
await promiseAll(promises)
},
})
if (finalAcknoledgement.done) {
await promiseAll([
this.#updatedStatus(entity, IndexMetadataStatus.DONE),
this.#indexSyncService.update({
data: {
last_key: finalAcknoledgement.lastCursor,
},
selector: {
entity,
},
}),
this.removeEntities([entity], true),
])
}
if (finalAcknoledgement.err) {
await this.#updatedStatus(entity, IndexMetadataStatus.ERROR)
}
}
async syncEntity({
entityName,
pagination = {},
ack,
}: {
entityName: string
pagination?: {
cursor?: string
updated_at?: string | Date
limit?: number
batchSize?: number
}
ack: (ack: {
lastCursor: string | null
done?: boolean
err?: Error
}) => Promise<void>
}): Promise<{
lastCursor: string | null
done?: boolean
err?: Error
}> {
this.#isReadyOrThrow()
const schemaEntityObjectRepresentation = this.#schemaObjectRepresentation[
entityName
] as SchemaObjectEntityRepresentation
const { fields, alias, moduleConfig } = schemaEntityObjectRepresentation
const isLink = !!moduleConfig?.isLink
const entityPrimaryKey = fields.find(
(field) => !!moduleConfig?.primaryKeys?.includes(field)
)
if (!entityPrimaryKey) {
// TODO: for now these are skiped
const acknoledgement = {
lastCursor: pagination.cursor ?? null,
done: true,
}
await ack(acknoledgement)
return acknoledgement
}
let processed = 0
let currentCursor = pagination.cursor!
const batchSize = Math.min(pagination.batchSize ?? 100, 100)
const limit = pagination.limit ?? Infinity
let done = false
let error = null
while (processed < limit || !done) {
const filters: Record<string, any> = {}
if (currentCursor) {
filters[entityPrimaryKey] = { $gt: currentCursor }
}
if (pagination.updated_at) {
filters["updated_at"] = { $gt: pagination.updated_at }
}
const { data } = await this.#query.graph({
entity: alias,
fields: [entityPrimaryKey],
filters,
pagination: {
order: {
[entityPrimaryKey]: "asc",
},
take: batchSize,
},
})
done = !data.length
if (done) {
break
}
const envelop: Event = {
data,
name: !isLink
? `*.${CommonEvents.CREATED}`
: `*.${CommonEvents.ATTACHED}`,
}
try {
await this.#storageProvider.consumeEvent(
schemaEntityObjectRepresentation
)(envelop)
currentCursor = data[data.length - 1][entityPrimaryKey]
processed += data.length
await ack({ lastCursor: currentCursor })
} catch (err) {
this.#logger.error(
`Index engine] sync failed for entity ${entityName}`,
err
)
error = err
break
}
}
let acknoledgement: { lastCursor: string; done?: boolean; err?: Error } = {
lastCursor: currentCursor,
done: true,
}
if (error) {
acknoledgement = {
lastCursor: currentCursor,
err: error,
}
await ack(acknoledgement)
return acknoledgement
}
await ack(acknoledgement)
return acknoledgement
}
}