From bd571aca82997ce7ac2014d6746e5593e36cc7ec Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" <37986729+carlos-r-l-rodrigues@users.noreply.github.com> Date: Thu, 4 Sep 2025 11:18:02 -0300 Subject: [PATCH] chore(orchestration): remote joiner query planner (#13364) What: - Added query planning to the Remote Joiner, enabling phased and parallel execution of data aggregation. - Replaced object deletes with non-enumerable property hiding to improve performance. --- .../translation-test/medusa-config.ts | 26 + .../src/links/option-translation.ts | 8 + .../src/links/product-category-translation.ts | 8 + .../src/links/product-translation.ts | 8 + .../src/links/variants-translation.ts | 8 + .../src/modules/translation/index.ts | 8 + .../.snapshot-medusa-translation-module.json | 95 +++ .../migrations/Migration20240907134741.ts | 14 + .../src/modules/translation/models/index.ts | 1 + .../modules/translation/models/translation.ts | 7 + .../src/modules/translation/service.ts | 49 ++ .../__tests__/query-graph/query-graph.ts | 554 ++++++++++++++++ .../src/remote-query/remote-query.ts | 18 + .../__tests__/joiner/remote-joiner-data.ts | 13 +- .../orchestration/src/joiner/remote-joiner.ts | 595 ++++++++++++------ packages/core/types/src/joiner/index.ts | 13 + 16 files changed, 1234 insertions(+), 191 deletions(-) create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/medusa-config.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/option-translation.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-category-translation.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-translation.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/variants-translation.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/index.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/.snapshot-medusa-translation-module.json create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/Migration20240907134741.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/index.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/translation.ts create mode 100644 integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/service.ts create mode 100644 integration-tests/modules/__tests__/query-graph/query-graph.ts diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/medusa-config.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/medusa-config.ts new file mode 100644 index 0000000000..e1b382dc77 --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/medusa-config.ts @@ -0,0 +1,26 @@ +const { defineConfig } = require("@medusajs/framework/utils") + +const DB_HOST = process.env.DB_HOST +const DB_USERNAME = process.env.DB_USERNAME +const DB_PASSWORD = process.env.DB_PASSWORD +const DB_NAME = process.env.DB_TEMP_NAME +const DB_URL = `postgres://${DB_USERNAME}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}` + +process.env.DATABASE_URL = DB_URL + +module.exports = defineConfig({ + admin: { + disable: true, + }, + projectConfig: { + http: { + jwtSecret: "secret", + }, + }, + modules: [ + { + key: "translation", + resolve: "./src/modules/translation", + }, + ], +}) diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/option-translation.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/option-translation.ts new file mode 100644 index 0000000000..822e95583b --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/option-translation.ts @@ -0,0 +1,8 @@ +import ProductModule from "@medusajs/medusa/product" +import { defineLink } from "@medusajs/utils" +import Translation from "../modules/translation" + +export default defineLink( + ProductModule.linkable.productOption.id, + Translation.linkable.translation.id +) diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-category-translation.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-category-translation.ts new file mode 100644 index 0000000000..940e4d4eec --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-category-translation.ts @@ -0,0 +1,8 @@ +import ProductModule from "@medusajs/medusa/product" +import { defineLink } from "@medusajs/utils" +import Translation from "../modules/translation" + +export default defineLink( + ProductModule.linkable.productCategory.id, + Translation.linkable.translation.id +) diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-translation.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-translation.ts new file mode 100644 index 0000000000..883789d998 --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/product-translation.ts @@ -0,0 +1,8 @@ +import { defineLink } from "@medusajs/framework/utils" +import ProductModule from "@medusajs/medusa/product" +import Translation from "../modules/translation" + +export default defineLink( + ProductModule.linkable.product.id, + Translation.linkable.translation.id +) diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/variants-translation.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/variants-translation.ts new file mode 100644 index 0000000000..565a978c1e --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/links/variants-translation.ts @@ -0,0 +1,8 @@ +import ProductModule from "@medusajs/medusa/product" +import { defineLink } from "@medusajs/utils" +import Translation from "../modules/translation" + +export default defineLink( + ProductModule.linkable.productVariant.id, + Translation.linkable.translation.id +) diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/index.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/index.ts new file mode 100644 index 0000000000..d56166e45a --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/index.ts @@ -0,0 +1,8 @@ +import { Module } from "@medusajs/framework/utils"; +import { TranslationModule } from "./service"; + +export const TRANSLATION = "translation"; + +export default Module(TRANSLATION, { + service: TranslationModule, +}); diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/.snapshot-medusa-translation-module.json b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/.snapshot-medusa-translation-module.json new file mode 100644 index 0000000000..2d3c9418b3 --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/.snapshot-medusa-translation-module.json @@ -0,0 +1,95 @@ +{ + "namespaces": [ + "public" + ], + "name": "public", + "tables": [ + { + "columns": { + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "key": { + "name": "key", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "value": { + "name": "value", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "default": "'{}'", + "mappedType": "json" + }, + "created_at": { + "name": "created_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "translation", + "schema": "public", + "indexes": [ + { + "keyName": "IDX_translation_key_unique", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_translation_key_unique\" ON \"translation\" (key) WHERE deleted_at IS NULL" + }, + { + "keyName": "translation_pkey", + "columnNames": [ + "id" + ], + "composite": false, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {} + } + ] +} diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/Migration20240907134741.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/Migration20240907134741.ts new file mode 100644 index 0000000000..0f0825ce08 --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/migrations/Migration20240907134741.ts @@ -0,0 +1,14 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240907134741 extends Migration { + + async up(): Promise { + this.addSql('create table if not exists "translation" ("id" text not null, "key" text not null, "value" jsonb not null default \'{}\', "created_at" timestamptz not null default now(), "updated_at" timestamptz not null default now(), "deleted_at" timestamptz null, constraint "translation_pkey" primary key ("id"));'); + this.addSql('CREATE UNIQUE INDEX IF NOT EXISTS "IDX_translation_key_unique" ON "translation" (key) WHERE deleted_at IS NULL;'); + } + + async down(): Promise { + this.addSql('drop table if exists "translation" cascade;'); + } + +} diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/index.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/index.ts new file mode 100644 index 0000000000..79d0f1686e --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/index.ts @@ -0,0 +1 @@ +export { default as Translation } from "./translation"; diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/translation.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/translation.ts new file mode 100644 index 0000000000..b9808be380 --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/models/translation.ts @@ -0,0 +1,7 @@ +import { model } from "@medusajs/framework/utils"; + +export default model.define("translation", { + id: model.id({ prefix: "i18n" }).primaryKey(), + key: model.text().unique(), + value: model.json().default({}), +}); diff --git a/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/service.ts b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/service.ts new file mode 100644 index 0000000000..db7e2d321f --- /dev/null +++ b/integration-tests/modules/__tests__/__fixtures__/translation-test/src/modules/translation/service.ts @@ -0,0 +1,49 @@ +import { MedusaService } from "@medusajs/framework/utils" +import { Translation } from "./models" + +export class TranslationModule extends MedusaService({ + Translation, +}) { + private manager_ + + constructor({ manager }) { + super(...arguments) + + this.manager_ = manager + } + + // @ts-expect-error + async listTranslations(find, config, medusaContext) { + const { filters, context, id } = find ?? {} + let lang = null + + if (filters || context) { + lang = filters?.lang ?? context?.lang + delete filters?.lang + } + + const knex = this.manager_.getKnex() + + const q = knex({ tr: "translation" }).select(["tr.id", "tr.key"]) + + // Select JSON content for a specific lang if provided + if (lang) { + q.select( + knex.raw("tr.value->? AS content", [lang]), + knex.raw("? AS lang", [lang]) + ) + } else { + q.select("tr.value") + } + + const key = filters?.key + if (id) { + q.whereIn("tr.id", Array.isArray(id) ? id : [id]) + } else if (key) { + q.whereIn("tr.key", Array.isArray(key) ? key : [key]) + } + + // console.log(q.toString()) + return await q + } +} diff --git a/integration-tests/modules/__tests__/query-graph/query-graph.ts b/integration-tests/modules/__tests__/query-graph/query-graph.ts new file mode 100644 index 0000000000..7a62ca2da4 --- /dev/null +++ b/integration-tests/modules/__tests__/query-graph/query-graph.ts @@ -0,0 +1,554 @@ +import { medusaIntegrationTestRunner } from "@medusajs/test-utils" +import path from "path" + +jest.setTimeout(100000) + +import { createProductsWorkflow } from "@medusajs/core-flows" +import { Modules } from "@medusajs/utils" +import { TranslationModule } from "../__fixtures__/translation-test/src/modules/translation/service" + +const createTranslations = async (container, inputs) => { + const translationModule: any = container.resolve("translation") + + const created = await translationModule.createTranslations(inputs as any) + return Array.isArray(created) ? created : [created] +} + +const attachTranslationToProduct = async ( + container, + { productId, translation } +) => { + const [created] = await createTranslations(container, [translation]) + + const remoteLink: any = container.resolve("remoteLink") + await remoteLink.create({ + [Modules.PRODUCT]: { product_id: productId }, + translation: { translation_id: created.id }, + }) + + return created +} + +const attachTranslationToVariant = async ( + container, + { variantId, translation } +) => { + const [created] = await createTranslations(container, [translation]) + + const remoteLink: any = container.resolve("remoteLink") + await remoteLink.create({ + [Modules.PRODUCT]: { product_variant_id: variantId }, + translation: { translation_id: created.id }, + }) + + return created +} + +const attachTranslationToOption = async ( + container, + { optionId, translation } +) => { + const [created] = await createTranslations(container, [translation]) + + const remoteLink: any = container.resolve("remoteLink") + await remoteLink.create({ + [Modules.PRODUCT]: { product_option_id: optionId }, + translation: { translation_id: created.id }, + }) + + return created +} + +const attachTranslationToProductCategory = async ( + container, + { categoryId, translation } +) => { + const [created] = await createTranslations(container, [translation]) + + const remoteLink: any = container.resolve("remoteLink") + await remoteLink.create({ + [Modules.PRODUCT]: { product_category_id: categoryId }, + translation: { translation_id: created.id }, + }) + + return created +} + +medusaIntegrationTestRunner({ + cwd: path.join(__dirname, "../__fixtures__/translation-test"), + testSuite: ({ getContainer }) => { + describe("query.graph()", () => { + beforeEach(async () => { + const container = getContainer() + const productService: any = container.resolve("product") + + const categories = await Promise.all( + [1, 2, 3].map((i) => + productService.createProductCategories({ + name: `Category ${i}`, + }) + ) + ) + + const buildProduct = (i: number, categoryId: string) => ({ + title: `Product ${i}`, + category_ids: [categoryId], + options: [ + { + title: "size", + values: ["small", "large"], + }, + ], + variants: [ + { + title: `P${i} Variant 1`, + options: { size: "small" }, + prices: [ + { + amount: 10, + currency_code: "usd", + }, + ], + }, + { + title: `P${i} Variant 2`, + options: { size: "large" }, + prices: [ + { + amount: 20, + currency_code: "usd", + }, + ], + }, + ], + }) + + const createdProducts = await Promise.all( + [1, 2, 3].map( + async (i) => + await createProductsWorkflow(container).run({ + input: { + products: [buildProduct(i, categories[i - 1].id)], + }, + }) + ) + ) + + const productsWithRels = await Promise.all( + createdProducts.map((p) => + productService.retrieveProduct(p.result[0].id, { + relations: [ + "variants", + "options", + "options.values", + "categories", + ], + }) + ) + ) + + await Promise.all( + productsWithRels.map(async (p, idx) => { + const i = idx + 1 + await attachTranslationToProduct(getContainer(), { + productId: p.id, + translation: { + key: p.id, + value: { + pt: { title: `Produto ${i}` }, + fr: { title: `Produit ${i}` }, + }, + }, + }) + + const cat = p.categories?.[0] + if (cat) { + await attachTranslationToProductCategory(getContainer(), { + categoryId: cat.id, + translation: { + key: cat.id, + value: { + pt: { name: `Categoria ${i}` }, + fr: { name: `Catégorie ${i}` }, + }, + }, + }) + } + + const opt = p.options?.[0] + if (opt) { + await attachTranslationToOption(getContainer(), { + optionId: opt.id, + translation: { + key: opt.id, + value: { + pt: { title: "Tamanho" }, + fr: { title: "Taille" }, + }, + }, + }) + } + + await Promise.all( + (p.variants || []).map((v, vi) => { + const variantNumber = v.title.split("").pop() + return attachTranslationToVariant(getContainer(), { + variantId: v.id, + translation: { + key: v.id, + value: { + pt: { title: `Variante ${variantNumber}` }, + fr: { title: `Variante ${variantNumber}` }, + }, + }, + }) + }) + ) + }) + ) + }) + + it("should call same entity in different levels (variant)", async () => { + const container = getContainer() + const query = container.resolve("query") + const productService = container.resolve(Modules.PRODUCT) + const inventoryService = container.resolve(Modules.INVENTORY) + + const productServiceSpy = jest.spyOn( + productService, + "listProductVariants" + ) + const inventoryServiceSpy = jest.spyOn( + inventoryService, + "listInventoryItems" + ) + + const result = await query.graph({ + entity: "variants", + fields: [ + "id", + "manage_inventory", + "inventory.id", + "inventory.variants.id", + ], + }) + + expect(productServiceSpy).toHaveBeenCalledTimes(2) + expect(inventoryServiceSpy).toHaveBeenCalledTimes(1) + }) + + it("should call services in correct order with parallel execution where possible", async () => { + const container = getContainer() + + const query = container.resolve("query") + const productService = container.resolve(Modules.PRODUCT) + const priceService = container.resolve(Modules.PRICING) + const translationService = container.resolve( + "translation" + ) as TranslationModule + + const productServiceSpy = jest.spyOn(productService, "listProducts") + const translationServiceSpy = jest.spyOn( + translationService, + "listTranslations" + ) + const priceServiceSpy = jest.spyOn(priceService, "listPriceSets") + + // Execute the query + const result = await query.graph({ + entity: "product", + fields: [ + "sales_channels.name", + "title", + "translation.*", + "categories.name", + "categories.translation.*", + "variants.title", + "variants.translation.*", + "options.title", + "options.translation.*", + "variants.prices.amount", + "variants.prices.currency_code", + ], + }) + + expect(productServiceSpy.mock.calls[0][1]).toEqual({ + select: [ + "title", + "variants_id", + "id", + "categories.name", + "categories.id", + "variants.title", + "variants.id", + "options.title", + "options.id", + ], + relations: ["categories", "variants", "options"], + args: {}, + }) + + expect(translationServiceSpy.mock.calls[0][0].id).toHaveLength(3) + expect(translationServiceSpy.mock.calls[1][0].id).toHaveLength(12) + expect(priceServiceSpy.mock.calls[0][0].id).toHaveLength(6) + + expect(result.data).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + title: "Product 3", + categories: [ + expect.objectContaining({ + name: "Category 3", + translation: expect.objectContaining({ + value: { + fr: { + name: "Catégorie 3", + }, + pt: { + name: "Categoria 3", + }, + }, + }), + }), + ], + variants: expect.arrayContaining([ + expect.objectContaining({ + title: "P3 Variant 2", + translation: expect.objectContaining({ + value: { + fr: { + title: "Variante 2", + }, + pt: { + title: "Variante 2", + }, + }, + }), + prices: expect.arrayContaining([ + expect.objectContaining({ + amount: 20, + currency_code: "usd", + }), + ]), + }), + expect.objectContaining({ + title: "P3 Variant 1", + translation: expect.objectContaining({ + value: { + fr: { + title: "Variante 1", + }, + pt: { + title: "Variante 1", + }, + }, + }), + prices: expect.arrayContaining([ + expect.objectContaining({ + amount: 10, + currency_code: "usd", + }), + ]), + }), + ]), + options: expect.arrayContaining([ + expect.objectContaining({ + title: "size", + translation: expect.objectContaining({ + value: { + fr: { + title: "Taille", + }, + pt: { + title: "Tamanho", + }, + }, + }), + }), + ]), + sales_channels: [], + translation: expect.objectContaining({ + value: { + fr: { + title: "Produit 3", + }, + pt: { + title: "Produto 3", + }, + }, + }), + }), + expect.objectContaining({ + title: "Product 1", + categories: [ + expect.objectContaining({ + name: "Category 1", + translation: expect.objectContaining({ + value: { + fr: { + name: "Catégorie 1", + }, + pt: { + name: "Categoria 1", + }, + }, + }), + }), + ], + variants: expect.arrayContaining([ + expect.objectContaining({ + title: "P1 Variant 2", + translation: expect.objectContaining({ + value: { + fr: { + title: "Variante 2", + }, + pt: { + title: "Variante 2", + }, + }, + }), + prices: expect.arrayContaining([ + expect.objectContaining({ + amount: 20, + currency_code: "usd", + }), + ]), + }), + expect.objectContaining({ + title: "P1 Variant 1", + translation: expect.objectContaining({ + value: { + fr: { + title: "Variante 1", + }, + pt: { + title: "Variante 1", + }, + }, + }), + prices: expect.arrayContaining([ + expect.objectContaining({ + amount: 10, + currency_code: "usd", + }), + ]), + }), + ]), + options: expect.arrayContaining([ + expect.objectContaining({ + title: "size", + translation: expect.objectContaining({ + value: { + fr: { + title: "Taille", + }, + pt: { + title: "Tamanho", + }, + }, + }), + }), + ]), + sales_channels: [], + translation: expect.objectContaining({ + value: { + fr: { + title: "Produit 1", + }, + pt: { + title: "Produto 1", + }, + }, + }), + }), + expect.objectContaining({ + title: "Product 2", + categories: [ + expect.objectContaining({ + name: "Category 2", + translation: expect.objectContaining({ + value: { + fr: { + name: "Catégorie 2", + }, + pt: { + name: "Categoria 2", + }, + }, + }), + }), + ], + variants: expect.arrayContaining([ + expect.objectContaining({ + title: "P2 Variant 1", + translation: expect.objectContaining({ + value: { + fr: { + title: "Variante 1", + }, + pt: { + title: "Variante 1", + }, + }, + }), + prices: expect.arrayContaining([ + expect.objectContaining({ + amount: 10, + currency_code: "usd", + }), + ]), + }), + expect.objectContaining({ + title: "P2 Variant 2", + translation: expect.objectContaining({ + value: { + fr: { + title: "Variante 2", + }, + pt: { + title: "Variante 2", + }, + }, + }), + prices: expect.arrayContaining([ + expect.objectContaining({ + amount: 20, + currency_code: "usd", + }), + ]), + }), + ]), + options: expect.arrayContaining([ + expect.objectContaining({ + title: "size", + translation: expect.objectContaining({ + value: { + fr: { + title: "Taille", + }, + pt: { + title: "Tamanho", + }, + }, + }), + }), + ]), + sales_channels: [], + translation: expect.objectContaining({ + value: { + fr: { + title: "Produit 2", + }, + pt: { + title: "Produto 2", + }, + }, + }), + }), + ]) + ) + }) + }) + }, +}) diff --git a/packages/core/modules-sdk/src/remote-query/remote-query.ts b/packages/core/modules-sdk/src/remote-query/remote-query.ts index a6d8ce8256..1c4fc69a2c 100644 --- a/packages/core/modules-sdk/src/remote-query/remote-query.ts +++ b/packages/core/modules-sdk/src/remote-query/remote-query.ts @@ -296,6 +296,24 @@ export class RemoteQuery { } } + return this.executeFetchRequest({ + expand, + keyField, + ids, + relationship, + }) + } + + private async executeFetchRequest(params: { + expand: RemoteExpandProperty + keyField: string + ids?: (unknown | unknown[])[] + relationship?: JoinerRelationship + }): Promise<{ + data: unknown[] | { [path: string]: unknown } + path?: string + }> { + const { expand, keyField, ids, relationship } = params const serviceConfig = expand.serviceConfig const service = this.modulesMap.get(serviceConfig.serviceName)! diff --git a/packages/core/orchestration/src/__tests__/joiner/remote-joiner-data.ts b/packages/core/orchestration/src/__tests__/joiner/remote-joiner-data.ts index 86f98d39b0..b5b8d09c99 100644 --- a/packages/core/orchestration/src/__tests__/joiner/remote-joiner-data.ts +++ b/packages/core/orchestration/src/__tests__/joiner/remote-joiner-data.ts @@ -664,18 +664,19 @@ describe("RemoteJoiner", () => { ], }, ], - [ - { - service: "user", - fieds: ["name", "id"], - }, - ], [ { service: "variantService", fieds: ["id", "product_id"], }, ], + [ + { + service: "user", + fieds: ["name", "id"], + }, + ], + [ { service: "product", diff --git a/packages/core/orchestration/src/joiner/remote-joiner.ts b/packages/core/orchestration/src/joiner/remote-joiner.ts index cecc66fe61..95116af412 100644 --- a/packages/core/orchestration/src/joiner/remote-joiner.ts +++ b/packages/core/orchestration/src/joiner/remote-joiner.ts @@ -1,4 +1,6 @@ import { + ComputedJoinerRelationship, + ExecutionStage, InternalJoinerServiceConfig, JoinerRelationship, JoinerServiceConfigAlias, @@ -148,6 +150,105 @@ export class RemoteJoiner { }, {}) } + // compute ids to fetch for a relationship + private computeIdsForRelationship( + items: any[], + relationship: ComputedJoinerRelationship + ) { + const field = relationship.inverse + ? relationship.primaryKey + : relationship.foreignKey.split(".").pop()! + + const fieldsArray = relationship.inverse + ? relationship.primaryKeyArr + : relationship.foreignKeyArr + + const idsToFetch: Set = new Set() + for (let i = 0; i < items.length; i++) { + const item = items[i] + if (!item) { + continue + } + const values = fieldsArray.map((f) => item?.[f]) + if (values.length !== fieldsArray.length) { + continue + } + + if (fieldsArray.length === 1) { + const val = values[0] + if (Array.isArray(val)) { + for (let x = 0; x < val.length; x++) { + idsToFetch.add(val[x]) + } + } else { + idsToFetch.add(val) + } + } else { + idsToFetch.add(values) + } + } + + return { field, fieldsArray, idsToFetch } + } + + // assign fetched related data to items + private assignRelatedToItems(params: { + items: any[] + relationship: ComputedJoinerRelationship + relatedDataMap: Map + field: string + fieldsArray: string[] + }) { + const { items, relationship, relatedDataMap, field, fieldsArray } = params + + items.forEach((item) => { + if (!item) { + return + } + + const itemKey = fieldsArray.map((f) => item[f]).join(",") + + if (item[relationship.alias]) { + if (Array.isArray(item[field])) { + for (let i = 0; i < item[relationship.alias].length; i++) { + const it = item[relationship.alias][i] + item[relationship.alias][i] = Object.assign( + it, + relatedDataMap[it[relationship.primaryKey]] + ) + } + return + } + + item[relationship.alias] = Object.assign( + item[relationship.alias], + relatedDataMap[itemKey] + ) + return + } + + if (Array.isArray(item[field])) { + item[relationship.alias] = item[field].map((id) => { + if (relationship.isList && !Array.isArray(relatedDataMap[id])) { + relatedDataMap[id] = isDefined(relatedDataMap[id]) + ? [relatedDataMap[id]] + : [] + } + + return relatedDataMap[id] + }) + } else { + if (relationship.isList && !Array.isArray(relatedDataMap[itemKey])) { + relatedDataMap[itemKey] = isDefined(relatedDataMap[itemKey]) + ? [relatedDataMap[itemKey]] + : [] + } + + item[relationship.alias] = relatedDataMap[itemKey] + } + }) + } + static parseQuery( graphqlQuery: string, variables?: Record @@ -215,6 +316,27 @@ export class RemoteJoiner { service_.relationships = relationships } + // Precompute key arrays for all existing relationships on the service + if (service_.relationships?.size) { + for (const [, relVal] of service_.relationships.entries()) { + if (Array.isArray(relVal)) { + for (let i = 0; i < relVal.length; i++) { + const rel = relVal[i] as ComputedJoinerRelationship + rel.primaryKeyArr = rel.primaryKey.split(",") + rel.foreignKeyArr = rel.foreignKey + .split(",") + .map((fk) => fk.split(".").pop()!) + } + } else if (relVal) { + const rel = relVal as ComputedJoinerRelationship + rel.primaryKeyArr = rel.primaryKey.split(",") + rel.foreignKeyArr = rel.foreignKey + .split(",") + .map((fk) => fk.split(".").pop()!) + } + } + } + // add aliases const isReadOnlyDefinition = !isDefined(service_.serviceName) || service_.isReadOnlyLink @@ -306,7 +428,13 @@ export class RemoteJoiner { const service_ = expandedRelationships.get(extend.serviceName)! const aliasName = extend.relationship.alias - const rel = extend.relationship + const rel = extend.relationship as ComputedJoinerRelationship + + rel.primaryKeyArr = rel.primaryKey.split(",") + rel.foreignKeyArr = rel.foreignKey + .split(",") + .map((fk) => fk.split(".").pop()!) + if (service_.relationships?.has(aliasName)) { const existing = service_.relationships.get(aliasName)! const newRelation = Array.isArray(existing) @@ -333,7 +461,7 @@ export class RemoteJoiner { const service_ = this.serviceConfigCache.get(serviceName)! relationships.forEach((relationship, alias) => { - const rel = relationship as JoinerRelationship + const rel = relationship as ComputedJoinerRelationship if (service_.relationships?.has(alias)) { const existing = service_.relationships.get(alias)! const newRelation = Array.isArray(existing) @@ -620,9 +748,17 @@ export class RemoteJoiner { } const removeChildren = (item: any, prop: string) => { if (Array.isArray(item)) { - item.forEach((currentItem) => delete currentItem[prop]) + for (let i = 0; i < item.length; i++) { + Object.defineProperty(item[i], prop, { + value: undefined, + enumerable: false, + }) + } } else { - delete item[prop] + Object.defineProperty(item, prop, { + value: undefined, + enumerable: false, + }) } } @@ -694,17 +830,13 @@ export class RemoteJoiner { }): Promise { const { items, parsedExpands, implodeMapping = [], options } = params - if (!parsedExpands) { + if (parsedExpands.size === 0) { return } - for (const [expandedPath, expand] of parsedExpands.entries()) { - if (expandedPath === BASE_PATH) { - continue - } - - let nestedItems = items - const expandedPathLevels = expandedPath.split(".") + const getItemsForPath = (rootItems: any[], fullPath: string) => { + let nestedItems = rootItems + const expandedPathLevels = fullPath.split(".") for (let idx = 1; idx < expandedPathLevels.length - 1; idx++) { nestedItems = RemoteJoiner.getNestedItems( @@ -713,24 +845,160 @@ export class RemoteJoiner { ) } - if (nestedItems.length > 0) { - await this.expandProperty({ - items: nestedItems, - parentServiceConfig: expand.parentConfig!, - expand, - options, - }) + return nestedItems + } + + const root = parsedExpands.get(BASE_PATH) as any + const executionStages: { + service: string + paths: string[] + depth: number + }[][] = root?.executionStages + // remove root + root?.executionStages.shift() + + for (const stage of executionStages) { + const stageFetchGroups: any[] = [] + for (const { paths } of stage) { + const pathCtx: { + path: string + expand: RemoteExpandProperty + relationship: ComputedJoinerRelationship + nestedItems: any[] + field: string + fieldsArray: string[] + args?: any + ids: Set + }[] = [] + + for (const path of paths) { + const expand = parsedExpands.get(path)! + const nestedItems = getItemsForPath(items, path) + + if (!nestedItems?.length || !expand) { + continue + } + + const relationship = this.getEntityRelationship({ + parentServiceConfig: expand.parentConfig!, + property: expand.property, + entity: expand.entity, + }) + + if (!relationship) { + continue + } + + const { field, fieldsArray, idsToFetch } = + this.computeIdsForRelationship(nestedItems, relationship) + + pathCtx.push({ + path, + expand, + relationship, + nestedItems, + field, + fieldsArray, + args: expand.args, + ids: idsToFetch, + }) + } + + if (!pathCtx.length) { + continue + } + + // Group by pkField + const byPkField = new Map() + for (const ctx of pathCtx) { + const key = ctx.field + if (!byPkField.has(key)) { + byPkField.set(key, []) + } + byPkField.get(key)!.push(ctx) + } + + for (const [pkField, ctxs] of byPkField.entries()) { + const unionIds: any[] = Array.from( + new Set(ctxs.flatMap((c) => Array.from(c.ids))) + ) + const unionFields = Array.from( + new Set(ctxs.flatMap((c) => c.expand.fields ?? [])) + ) + const unionArgs = ctxs.flatMap((c) => c.expand.args ?? []) + + const base = ctxs[0].expand + const aggExpand: RemoteExpandProperty = { + ...base, + fields: unionFields, + } + + if (unionArgs.length) { + aggExpand.args = unionArgs + } + + const relationship = ctxs[0].relationship + + const promise = this.fetchData({ + expand: aggExpand, + pkField, + ids: unionIds, + relationship, + options, + }) + + stageFetchGroups.push({ ctxs, relationship, promise }) + } + } + + const stageResults = await Promise.all( + stageFetchGroups.map((g) => g.promise) + ) + + for (let i = 0; i < stageFetchGroups.length; i++) { + const { ctxs, relationship } = stageFetchGroups[i] + const relatedDataArray = stageResults[i] + + const joinFields = relationship.inverse + ? relationship.foreignKeyArr + : relationship.primaryKeyArr + + const relData = relatedDataArray.path + ? (relatedDataArray.data as any)[relatedDataArray.path!] + : relatedDataArray.data + + const relatedDataMap = RemoteJoiner.createRelatedDataMap( + relData, + joinFields + ) + + for (let ci = 0; ci < ctxs.length; ci++) { + const ctx = ctxs[ci] + this.assignRelatedToItems({ + items: ctx.nestedItems, + relationship: ctx.relationship, + relatedDataMap, + field: ctx.field, + fieldsArray: ctx.fieldsArray, + }) + } } } - this.handleFieldAliases({ items, parsedExpands, implodeMapping }) + if (implodeMapping.length > 0) { + this.handleFieldAliases({ + items, + parsedExpands, + implodeMapping, + }) + } } private getEntityRelationship(params: { parentServiceConfig: InternalJoinerServiceConfig property: string entity?: string - }): JoinerRelationship { + }): ComputedJoinerRelationship { const { parentServiceConfig, property, entity } = params const propEntity = entity ?? parentServiceConfig?.entity @@ -738,12 +1006,12 @@ export class RemoteJoiner { if (Array.isArray(rel)) { if (!propEntity) { - return rel[0] + return rel[0] as ComputedJoinerRelationship } const entityRel = rel.find((r) => r.entity === propEntity) if (entityRel) { - return entityRel + return entityRel as ComputedJoinerRelationship } // If entity is not found, return the relationship where the primary key matches @@ -751,165 +1019,12 @@ export class RemoteJoiner { entity: propEntity, })! - return rel.find((r) => serviceEntity.primaryKeys.includes(r.primaryKey))! + return rel.find((r) => + serviceEntity.primaryKeys.includes(r.primaryKey) + )! as ComputedJoinerRelationship } - return rel as JoinerRelationship - } - - private async expandProperty(params: { - items: any[] - parentServiceConfig: InternalJoinerServiceConfig - expand?: RemoteExpandProperty - options?: RemoteJoinerOptions - }): Promise { - const { items, parentServiceConfig, expand, options } = params - - if (!expand) { - return - } - - const relationship = this.getEntityRelationship({ - parentServiceConfig, - property: expand.property, - entity: expand.entity, - }) - - if (!relationship) { - return - } - - await this.expandRelationshipProperty({ - items, - expand, - relationship, - options, - }) - } - - private async expandRelationshipProperty(params: { - items: any[] - expand: RemoteExpandProperty - relationship: JoinerRelationship - options?: RemoteJoinerOptions - }): Promise { - const { items, expand, relationship, options } = params - - const field = relationship.inverse - ? relationship.primaryKey - : relationship.foreignKey.split(".").pop()! - const fieldsArray = field.split(",") - - const idsToFetch: Set = new Set() - - const requestedFields = new Set(expand.fields ?? []) - const fieldsById = new Map() - items.forEach((item) => { - const values = fieldsArray.map((field) => item?.[field]) - - if (values.length === fieldsArray.length) { - if (item?.[relationship.alias]) { - for (const field of requestedFields.values()) { - if (field in item[relationship.alias]) { - requestedFields.delete(field) - fieldsById.delete(field) - } else { - if (!fieldsById.has(field)) { - fieldsById.set(field, []) - } - - fieldsById - .get(field)! - .push(fieldsArray.length === 1 ? values[0] : values) - } - } - } else { - if (fieldsArray.length === 1) { - idsToFetch.add(values[0]) - } else { - idsToFetch.add(values) - } - } - } - }) - - for (const values of fieldsById.values()) { - values.forEach((val) => { - idsToFetch.add(val) - }) - } - - if (idsToFetch.size === 0) { - return - } - - const relatedDataArray = await this.fetchData({ - expand, - pkField: field, - ids: Array.from(idsToFetch), - relationship, - options, - }) - - const joinFields = relationship.inverse - ? relationship.foreignKey.split(",") - : relationship.primaryKey.split(",") - - const relData = relatedDataArray.path - ? relatedDataArray.data[relatedDataArray.path!] - : relatedDataArray.data - - const relatedDataMap = RemoteJoiner.createRelatedDataMap( - relData, - joinFields - ) - - items.forEach((item) => { - if (!item) { - return - } - - const itemKey = fieldsArray.map((field) => item[field]).join(",") - - if (item[relationship.alias]) { - if (Array.isArray(item[field])) { - for (let i = 0; i < item[relationship.alias].length; i++) { - const it = item[relationship.alias][i] - item[relationship.alias][i] = Object.assign( - it, - relatedDataMap[it[relationship.primaryKey]] - ) - } - return - } - - item[relationship.alias] = Object.assign( - item[relationship.alias], - relatedDataMap[itemKey] - ) - return - } - - if (Array.isArray(item[field])) { - item[relationship.alias] = item[field].map((id) => { - if (relationship.isList && !Array.isArray(relatedDataMap[id])) { - relatedDataMap[id] = isDefined(relatedDataMap[id]) - ? [relatedDataMap[id]] - : [] - } - - return relatedDataMap[id] - }) - } else { - if (relationship.isList && !Array.isArray(relatedDataMap[itemKey])) { - relatedDataMap[itemKey] = isDefined(relatedDataMap[itemKey]) - ? [relatedDataMap[itemKey]] - : [] - } - - item[relationship.alias] = relatedDataMap[itemKey] - } - }) + return rel as ComputedJoinerRelationship } private parseExpands( @@ -944,9 +1059,98 @@ export class RemoteJoiner { const groupedExpands = this.groupExpands(parsedExpands) + this.buildQueryPlan(parsedExpands, groupedExpands) + return groupedExpands } + private buildQueryPlan( + fullParsedExpands: Map, + groupedExpands: Map + ): void { + const stages: ExecutionStage[][] = [] + + // Root stage + const rootExp = groupedExpands.get(BASE_PATH)! + const rootService = rootExp.serviceConfig.serviceName + + stages.push([ + { + service: rootService, + entity: rootExp.entity, + paths: [], + depth: 0, + }, + ]) + + // Build service sequence for each path + const getServiceSequence = (path: string): string[] => { + const sequence: string[] = [] + let currentPath = path + + while (currentPath && currentPath !== BASE_PATH) { + const expand = fullParsedExpands.get(currentPath) + if (!expand) { + break + } + + sequence.unshift(expand.serviceConfig.serviceName) + currentPath = expand.parent + } + + return sequence + } + + // Group paths by their service sequence length and last service in sequence + const pathsBySequenceDepth = new Map>() + + for (const [path, expand] of groupedExpands.entries()) { + if (path === BASE_PATH) { + continue + } + + const serviceSequence = getServiceSequence(path) + const sequenceDepth = serviceSequence.length + const lastService = expand.serviceConfig.serviceName + + if (!pathsBySequenceDepth.has(sequenceDepth)) { + pathsBySequenceDepth.set(sequenceDepth, new Map()) + } + + const depthMap = pathsBySequenceDepth.get(sequenceDepth)! + if (!depthMap.has(lastService)) { + depthMap.set(lastService, []) + } + + depthMap.get(lastService)!.push(path) + } + + const maxDepth = Math.max(...Array.from(pathsBySequenceDepth.keys())) + + for (let depth = 1; depth <= maxDepth; depth++) { + const serviceMap = pathsBySequenceDepth.get(depth) + if (!serviceMap) { + continue + } + + const stageGroups: ExecutionStage[] = [] + for (const [service, paths] of serviceMap.entries()) { + stageGroups.push({ + service, + paths, + depth: depth, + }) + } + + if (stageGroups.length > 0) { + stages.push(stageGroups) + } + } + + const root = groupedExpands.get(BASE_PATH)! + root.executionStages = stages + } + private parseProperties(params: { initialService: RemoteExpandProperty query: RemoteJoinerQuery @@ -1185,9 +1389,13 @@ export class RemoteJoiner { } if (forwardArgumentsOnPath.includes(BASE_PATH + "." + midProp)) { - extraExtends.args = (existingExpand?.args ?? []).concat( + const forwarded = (existingExpand?.args ?? []).concat( expand?.args ?? [] ) + + if (forwarded.length) { + extraExtends.args = forwarded + } } extMapping.push(extraExtends) @@ -1275,8 +1483,19 @@ export class RemoteJoiner { targetExpand = targetExpand.expands[key] ??= {} } - targetExpand.fields = [...new Set(expand.fields)] - targetExpand.args = expand.args + const nextFields = [ + ...new Set([ + ...(targetExpand.fields ?? []), + ...(expand.fields ?? []), + ]), + ] + targetExpand.fields = nextFields + if (expand.args?.length) { + const existingArgs = targetExpand.args + targetExpand.args = existingArgs + ? existingArgs.concat(expand.args) + : expand.args + } mergedExpands.delete(path) mergedPaths.set(path, expand) @@ -1459,7 +1678,10 @@ export class RemoteJoiner { property: key, }) if (isRel) { - delete shallowProperty[key] + Object.defineProperty(shallowProperty, key, { + value: undefined, + enumerable: false, + }) } } @@ -1641,7 +1863,10 @@ function gerPrimaryKeysAndOtherFilters({ serviceConfig, queryObj }): { value: filters[primaryKeyFilter], } - delete filters[primaryKeyFilter] + Object.defineProperty(filters, primaryKeyFilter, { + value: undefined, + enumerable: false, + }) } } diff --git a/packages/core/types/src/joiner/index.ts b/packages/core/types/src/joiner/index.ts index 781ff68b89..641e3a979e 100644 --- a/packages/core/types/src/joiner/index.ts +++ b/packages/core/types/src/joiner/index.ts @@ -18,6 +18,11 @@ export type JoinerRelationship = { args?: Record } +export type ComputedJoinerRelationship = JoinerRelationship & { + primaryKeyArr: string[] + foreignKeyArr: string[] +} + export interface JoinerServiceConfigAlias { name: string | string[] entity?: string @@ -105,7 +110,15 @@ export type InternalJoinerServiceConfig = Omit< entity?: string } +export type ExecutionStage = { + service: string + entity?: string + paths: string[] + depth: number +} + export interface RemoteExpandProperty { + executionStages?: ExecutionStage[][] property: string parent: string parentConfig?: InternalJoinerServiceConfig