chore(orchestration): remote joiner query planner (#13364)

What:
 - Added query planning to the Remote Joiner, enabling phased and parallel execution of data aggregation.
- Replaced object deletes with non-enumerable property hiding to improve performance.
This commit is contained in:
Carlos R. L. Rodrigues
2025-09-04 11:18:02 -03:00
committed by GitHub
parent b7fef5b7ef
commit bd571aca82
16 changed files with 1234 additions and 191 deletions

View File

@@ -664,18 +664,19 @@ describe("RemoteJoiner", () => {
],
},
],
[
{
service: "user",
fieds: ["name", "id"],
},
],
[
{
service: "variantService",
fieds: ["id", "product_id"],
},
],
[
{
service: "user",
fieds: ["name", "id"],
},
],
[
{
service: "product",

View File

@@ -1,4 +1,6 @@
import {
ComputedJoinerRelationship,
ExecutionStage,
InternalJoinerServiceConfig,
JoinerRelationship,
JoinerServiceConfigAlias,
@@ -148,6 +150,105 @@ export class RemoteJoiner {
}, {})
}
// compute ids to fetch for a relationship
private computeIdsForRelationship(
items: any[],
relationship: ComputedJoinerRelationship
) {
const field = relationship.inverse
? relationship.primaryKey
: relationship.foreignKey.split(".").pop()!
const fieldsArray = relationship.inverse
? relationship.primaryKeyArr
: relationship.foreignKeyArr
const idsToFetch: Set<any> = new Set()
for (let i = 0; i < items.length; i++) {
const item = items[i]
if (!item) {
continue
}
const values = fieldsArray.map((f) => item?.[f])
if (values.length !== fieldsArray.length) {
continue
}
if (fieldsArray.length === 1) {
const val = values[0]
if (Array.isArray(val)) {
for (let x = 0; x < val.length; x++) {
idsToFetch.add(val[x])
}
} else {
idsToFetch.add(val)
}
} else {
idsToFetch.add(values)
}
}
return { field, fieldsArray, idsToFetch }
}
// assign fetched related data to items
private assignRelatedToItems(params: {
items: any[]
relationship: ComputedJoinerRelationship
relatedDataMap: Map<string, any>
field: string
fieldsArray: string[]
}) {
const { items, relationship, relatedDataMap, field, fieldsArray } = params
items.forEach((item) => {
if (!item) {
return
}
const itemKey = fieldsArray.map((f) => item[f]).join(",")
if (item[relationship.alias]) {
if (Array.isArray(item[field])) {
for (let i = 0; i < item[relationship.alias].length; i++) {
const it = item[relationship.alias][i]
item[relationship.alias][i] = Object.assign(
it,
relatedDataMap[it[relationship.primaryKey]]
)
}
return
}
item[relationship.alias] = Object.assign(
item[relationship.alias],
relatedDataMap[itemKey]
)
return
}
if (Array.isArray(item[field])) {
item[relationship.alias] = item[field].map((id) => {
if (relationship.isList && !Array.isArray(relatedDataMap[id])) {
relatedDataMap[id] = isDefined(relatedDataMap[id])
? [relatedDataMap[id]]
: []
}
return relatedDataMap[id]
})
} else {
if (relationship.isList && !Array.isArray(relatedDataMap[itemKey])) {
relatedDataMap[itemKey] = isDefined(relatedDataMap[itemKey])
? [relatedDataMap[itemKey]]
: []
}
item[relationship.alias] = relatedDataMap[itemKey]
}
})
}
static parseQuery(
graphqlQuery: string,
variables?: Record<string, unknown>
@@ -215,6 +316,27 @@ export class RemoteJoiner {
service_.relationships = relationships
}
// Precompute key arrays for all existing relationships on the service
if (service_.relationships?.size) {
for (const [, relVal] of service_.relationships.entries()) {
if (Array.isArray(relVal)) {
for (let i = 0; i < relVal.length; i++) {
const rel = relVal[i] as ComputedJoinerRelationship
rel.primaryKeyArr = rel.primaryKey.split(",")
rel.foreignKeyArr = rel.foreignKey
.split(",")
.map((fk) => fk.split(".").pop()!)
}
} else if (relVal) {
const rel = relVal as ComputedJoinerRelationship
rel.primaryKeyArr = rel.primaryKey.split(",")
rel.foreignKeyArr = rel.foreignKey
.split(",")
.map((fk) => fk.split(".").pop()!)
}
}
}
// add aliases
const isReadOnlyDefinition =
!isDefined(service_.serviceName) || service_.isReadOnlyLink
@@ -306,7 +428,13 @@ export class RemoteJoiner {
const service_ = expandedRelationships.get(extend.serviceName)!
const aliasName = extend.relationship.alias
const rel = extend.relationship
const rel = extend.relationship as ComputedJoinerRelationship
rel.primaryKeyArr = rel.primaryKey.split(",")
rel.foreignKeyArr = rel.foreignKey
.split(",")
.map((fk) => fk.split(".").pop()!)
if (service_.relationships?.has(aliasName)) {
const existing = service_.relationships.get(aliasName)!
const newRelation = Array.isArray(existing)
@@ -333,7 +461,7 @@ export class RemoteJoiner {
const service_ = this.serviceConfigCache.get(serviceName)!
relationships.forEach((relationship, alias) => {
const rel = relationship as JoinerRelationship
const rel = relationship as ComputedJoinerRelationship
if (service_.relationships?.has(alias)) {
const existing = service_.relationships.get(alias)!
const newRelation = Array.isArray(existing)
@@ -620,9 +748,17 @@ export class RemoteJoiner {
}
const removeChildren = (item: any, prop: string) => {
if (Array.isArray(item)) {
item.forEach((currentItem) => delete currentItem[prop])
for (let i = 0; i < item.length; i++) {
Object.defineProperty(item[i], prop, {
value: undefined,
enumerable: false,
})
}
} else {
delete item[prop]
Object.defineProperty(item, prop, {
value: undefined,
enumerable: false,
})
}
}
@@ -694,17 +830,13 @@ export class RemoteJoiner {
}): Promise<void> {
const { items, parsedExpands, implodeMapping = [], options } = params
if (!parsedExpands) {
if (parsedExpands.size === 0) {
return
}
for (const [expandedPath, expand] of parsedExpands.entries()) {
if (expandedPath === BASE_PATH) {
continue
}
let nestedItems = items
const expandedPathLevels = expandedPath.split(".")
const getItemsForPath = (rootItems: any[], fullPath: string) => {
let nestedItems = rootItems
const expandedPathLevels = fullPath.split(".")
for (let idx = 1; idx < expandedPathLevels.length - 1; idx++) {
nestedItems = RemoteJoiner.getNestedItems(
@@ -713,24 +845,160 @@ export class RemoteJoiner {
)
}
if (nestedItems.length > 0) {
await this.expandProperty({
items: nestedItems,
parentServiceConfig: expand.parentConfig!,
expand,
options,
})
return nestedItems
}
const root = parsedExpands.get(BASE_PATH) as any
const executionStages: {
service: string
paths: string[]
depth: number
}[][] = root?.executionStages
// remove root
root?.executionStages.shift()
for (const stage of executionStages) {
const stageFetchGroups: any[] = []
for (const { paths } of stage) {
const pathCtx: {
path: string
expand: RemoteExpandProperty
relationship: ComputedJoinerRelationship
nestedItems: any[]
field: string
fieldsArray: string[]
args?: any
ids: Set<string>
}[] = []
for (const path of paths) {
const expand = parsedExpands.get(path)!
const nestedItems = getItemsForPath(items, path)
if (!nestedItems?.length || !expand) {
continue
}
const relationship = this.getEntityRelationship({
parentServiceConfig: expand.parentConfig!,
property: expand.property,
entity: expand.entity,
})
if (!relationship) {
continue
}
const { field, fieldsArray, idsToFetch } =
this.computeIdsForRelationship(nestedItems, relationship)
pathCtx.push({
path,
expand,
relationship,
nestedItems,
field,
fieldsArray,
args: expand.args,
ids: idsToFetch,
})
}
if (!pathCtx.length) {
continue
}
// Group by pkField
const byPkField = new Map()
for (const ctx of pathCtx) {
const key = ctx.field
if (!byPkField.has(key)) {
byPkField.set(key, [])
}
byPkField.get(key)!.push(ctx)
}
for (const [pkField, ctxs] of byPkField.entries()) {
const unionIds: any[] = Array.from(
new Set(ctxs.flatMap((c) => Array.from(c.ids)))
)
const unionFields = Array.from(
new Set(ctxs.flatMap((c) => c.expand.fields ?? []))
)
const unionArgs = ctxs.flatMap((c) => c.expand.args ?? [])
const base = ctxs[0].expand
const aggExpand: RemoteExpandProperty = {
...base,
fields: unionFields,
}
if (unionArgs.length) {
aggExpand.args = unionArgs
}
const relationship = ctxs[0].relationship
const promise = this.fetchData({
expand: aggExpand,
pkField,
ids: unionIds,
relationship,
options,
})
stageFetchGroups.push({ ctxs, relationship, promise })
}
}
const stageResults = await Promise.all(
stageFetchGroups.map((g) => g.promise)
)
for (let i = 0; i < stageFetchGroups.length; i++) {
const { ctxs, relationship } = stageFetchGroups[i]
const relatedDataArray = stageResults[i]
const joinFields = relationship.inverse
? relationship.foreignKeyArr
: relationship.primaryKeyArr
const relData = relatedDataArray.path
? (relatedDataArray.data as any)[relatedDataArray.path!]
: relatedDataArray.data
const relatedDataMap = RemoteJoiner.createRelatedDataMap(
relData,
joinFields
)
for (let ci = 0; ci < ctxs.length; ci++) {
const ctx = ctxs[ci]
this.assignRelatedToItems({
items: ctx.nestedItems,
relationship: ctx.relationship,
relatedDataMap,
field: ctx.field,
fieldsArray: ctx.fieldsArray,
})
}
}
}
this.handleFieldAliases({ items, parsedExpands, implodeMapping })
if (implodeMapping.length > 0) {
this.handleFieldAliases({
items,
parsedExpands,
implodeMapping,
})
}
}
private getEntityRelationship(params: {
parentServiceConfig: InternalJoinerServiceConfig
property: string
entity?: string
}): JoinerRelationship {
}): ComputedJoinerRelationship {
const { parentServiceConfig, property, entity } = params
const propEntity = entity ?? parentServiceConfig?.entity
@@ -738,12 +1006,12 @@ export class RemoteJoiner {
if (Array.isArray(rel)) {
if (!propEntity) {
return rel[0]
return rel[0] as ComputedJoinerRelationship
}
const entityRel = rel.find((r) => r.entity === propEntity)
if (entityRel) {
return entityRel
return entityRel as ComputedJoinerRelationship
}
// If entity is not found, return the relationship where the primary key matches
@@ -751,165 +1019,12 @@ export class RemoteJoiner {
entity: propEntity,
})!
return rel.find((r) => serviceEntity.primaryKeys.includes(r.primaryKey))!
return rel.find((r) =>
serviceEntity.primaryKeys.includes(r.primaryKey)
)! as ComputedJoinerRelationship
}
return rel as JoinerRelationship
}
private async expandProperty(params: {
items: any[]
parentServiceConfig: InternalJoinerServiceConfig
expand?: RemoteExpandProperty
options?: RemoteJoinerOptions
}): Promise<void> {
const { items, parentServiceConfig, expand, options } = params
if (!expand) {
return
}
const relationship = this.getEntityRelationship({
parentServiceConfig,
property: expand.property,
entity: expand.entity,
})
if (!relationship) {
return
}
await this.expandRelationshipProperty({
items,
expand,
relationship,
options,
})
}
private async expandRelationshipProperty(params: {
items: any[]
expand: RemoteExpandProperty
relationship: JoinerRelationship
options?: RemoteJoinerOptions
}): Promise<void> {
const { items, expand, relationship, options } = params
const field = relationship.inverse
? relationship.primaryKey
: relationship.foreignKey.split(".").pop()!
const fieldsArray = field.split(",")
const idsToFetch: Set<any> = new Set()
const requestedFields = new Set(expand.fields ?? [])
const fieldsById = new Map<string, string[]>()
items.forEach((item) => {
const values = fieldsArray.map((field) => item?.[field])
if (values.length === fieldsArray.length) {
if (item?.[relationship.alias]) {
for (const field of requestedFields.values()) {
if (field in item[relationship.alias]) {
requestedFields.delete(field)
fieldsById.delete(field)
} else {
if (!fieldsById.has(field)) {
fieldsById.set(field, [])
}
fieldsById
.get(field)!
.push(fieldsArray.length === 1 ? values[0] : values)
}
}
} else {
if (fieldsArray.length === 1) {
idsToFetch.add(values[0])
} else {
idsToFetch.add(values)
}
}
}
})
for (const values of fieldsById.values()) {
values.forEach((val) => {
idsToFetch.add(val)
})
}
if (idsToFetch.size === 0) {
return
}
const relatedDataArray = await this.fetchData({
expand,
pkField: field,
ids: Array.from(idsToFetch),
relationship,
options,
})
const joinFields = relationship.inverse
? relationship.foreignKey.split(",")
: relationship.primaryKey.split(",")
const relData = relatedDataArray.path
? relatedDataArray.data[relatedDataArray.path!]
: relatedDataArray.data
const relatedDataMap = RemoteJoiner.createRelatedDataMap(
relData,
joinFields
)
items.forEach((item) => {
if (!item) {
return
}
const itemKey = fieldsArray.map((field) => item[field]).join(",")
if (item[relationship.alias]) {
if (Array.isArray(item[field])) {
for (let i = 0; i < item[relationship.alias].length; i++) {
const it = item[relationship.alias][i]
item[relationship.alias][i] = Object.assign(
it,
relatedDataMap[it[relationship.primaryKey]]
)
}
return
}
item[relationship.alias] = Object.assign(
item[relationship.alias],
relatedDataMap[itemKey]
)
return
}
if (Array.isArray(item[field])) {
item[relationship.alias] = item[field].map((id) => {
if (relationship.isList && !Array.isArray(relatedDataMap[id])) {
relatedDataMap[id] = isDefined(relatedDataMap[id])
? [relatedDataMap[id]]
: []
}
return relatedDataMap[id]
})
} else {
if (relationship.isList && !Array.isArray(relatedDataMap[itemKey])) {
relatedDataMap[itemKey] = isDefined(relatedDataMap[itemKey])
? [relatedDataMap[itemKey]]
: []
}
item[relationship.alias] = relatedDataMap[itemKey]
}
})
return rel as ComputedJoinerRelationship
}
private parseExpands(
@@ -944,9 +1059,98 @@ export class RemoteJoiner {
const groupedExpands = this.groupExpands(parsedExpands)
this.buildQueryPlan(parsedExpands, groupedExpands)
return groupedExpands
}
private buildQueryPlan(
fullParsedExpands: Map<string, RemoteExpandProperty>,
groupedExpands: Map<string, RemoteExpandProperty>
): void {
const stages: ExecutionStage[][] = []
// Root stage
const rootExp = groupedExpands.get(BASE_PATH)!
const rootService = rootExp.serviceConfig.serviceName
stages.push([
{
service: rootService,
entity: rootExp.entity,
paths: [],
depth: 0,
},
])
// Build service sequence for each path
const getServiceSequence = (path: string): string[] => {
const sequence: string[] = []
let currentPath = path
while (currentPath && currentPath !== BASE_PATH) {
const expand = fullParsedExpands.get(currentPath)
if (!expand) {
break
}
sequence.unshift(expand.serviceConfig.serviceName)
currentPath = expand.parent
}
return sequence
}
// Group paths by their service sequence length and last service in sequence
const pathsBySequenceDepth = new Map<number, Map<string, string[]>>()
for (const [path, expand] of groupedExpands.entries()) {
if (path === BASE_PATH) {
continue
}
const serviceSequence = getServiceSequence(path)
const sequenceDepth = serviceSequence.length
const lastService = expand.serviceConfig.serviceName
if (!pathsBySequenceDepth.has(sequenceDepth)) {
pathsBySequenceDepth.set(sequenceDepth, new Map())
}
const depthMap = pathsBySequenceDepth.get(sequenceDepth)!
if (!depthMap.has(lastService)) {
depthMap.set(lastService, [])
}
depthMap.get(lastService)!.push(path)
}
const maxDepth = Math.max(...Array.from(pathsBySequenceDepth.keys()))
for (let depth = 1; depth <= maxDepth; depth++) {
const serviceMap = pathsBySequenceDepth.get(depth)
if (!serviceMap) {
continue
}
const stageGroups: ExecutionStage[] = []
for (const [service, paths] of serviceMap.entries()) {
stageGroups.push({
service,
paths,
depth: depth,
})
}
if (stageGroups.length > 0) {
stages.push(stageGroups)
}
}
const root = groupedExpands.get(BASE_PATH)!
root.executionStages = stages
}
private parseProperties(params: {
initialService: RemoteExpandProperty
query: RemoteJoinerQuery
@@ -1185,9 +1389,13 @@ export class RemoteJoiner {
}
if (forwardArgumentsOnPath.includes(BASE_PATH + "." + midProp)) {
extraExtends.args = (existingExpand?.args ?? []).concat(
const forwarded = (existingExpand?.args ?? []).concat(
expand?.args ?? []
)
if (forwarded.length) {
extraExtends.args = forwarded
}
}
extMapping.push(extraExtends)
@@ -1275,8 +1483,19 @@ export class RemoteJoiner {
targetExpand = targetExpand.expands[key] ??= {}
}
targetExpand.fields = [...new Set(expand.fields)]
targetExpand.args = expand.args
const nextFields = [
...new Set([
...(targetExpand.fields ?? []),
...(expand.fields ?? []),
]),
]
targetExpand.fields = nextFields
if (expand.args?.length) {
const existingArgs = targetExpand.args
targetExpand.args = existingArgs
? existingArgs.concat(expand.args)
: expand.args
}
mergedExpands.delete(path)
mergedPaths.set(path, expand)
@@ -1459,7 +1678,10 @@ export class RemoteJoiner {
property: key,
})
if (isRel) {
delete shallowProperty[key]
Object.defineProperty(shallowProperty, key, {
value: undefined,
enumerable: false,
})
}
}
@@ -1641,7 +1863,10 @@ function gerPrimaryKeysAndOtherFilters({ serviceConfig, queryObj }): {
value: filters[primaryKeyFilter],
}
delete filters[primaryKeyFilter]
Object.defineProperty(filters, primaryKeyFilter, {
value: undefined,
enumerable: false,
})
}
}