From fca5ad77b41856867ec68b1e46d04f1bb71cbc76 Mon Sep 17 00:00:00 2001 From: Harminder Virk Date: Tue, 20 May 2025 18:03:18 +0530 Subject: [PATCH] feat: process import from pre-processed chunks (#12527) Fixes: FRMW-2968 In this PR we have done two major things. - First, we remove storing CSV contents within the workflow storage and neither store the JSON payloads to be created/updated in workflows. Earlier, they all were workflow inputs, hence were stored in the workflow - Introduce a naive concept of chunks and process chunks one by one. The next PR making chunking a bit more robust while using streams, adding ability to resume from the failed chunk and so on. > [!IMPORTANT] > The new endpoint `/admin/product/imports` is not in use yet. But it will be after the next (final) PR. ## Old context in workflow storage ![CleanShot 2025-05-19 at 17 11 08@2x](https://github.com/user-attachments/assets/798bdcc9-a368-4c1f-afdd-2a77f5ce43e0) ## New context in workflow storage ![CleanShot 2025-05-19 at 17 15 08@2x](https://github.com/user-attachments/assets/0463d035-403f-4600-a9cd-5af24d5fee7c) --- .changeset/hot-flies-ring.md | 8 + .../product/admin/product-export.spec.ts | 3 +- .../product/admin/product-import.spec.ts | 1 - .../product/admin/product-imports.spec.ts | 610 ++++++++++++++++++ integration-tests/http/medusa-config.js | 1 + .../core-flows/src/product/steps/index.ts | 2 + .../steps/normalize-products-to-chunks.ts | 71 ++ .../product/steps/process-import-chunks.ts | 28 + .../workflows/import-products-as-chunks.ts | 137 ++++ .../core-flows/src/product/workflows/index.ts | 1 + packages/core/js-sdk/src/admin/product.ts | 2 +- .../imports/[transaction_id]/confirm/route.ts | 4 +- .../src/api/admin/products/imports/route.ts | 12 +- 13 files changed, 868 insertions(+), 12 deletions(-) create mode 100644 .changeset/hot-flies-ring.md create mode 100644 integration-tests/http/__tests__/product/admin/product-imports.spec.ts create mode 100644 packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts create mode 100644 packages/core/core-flows/src/product/steps/process-import-chunks.ts create mode 100644 packages/core/core-flows/src/product/workflows/import-products-as-chunks.ts diff --git a/.changeset/hot-flies-ring.md b/.changeset/hot-flies-ring.md new file mode 100644 index 0000000000..ccb21bf40e --- /dev/null +++ b/.changeset/hot-flies-ring.md @@ -0,0 +1,8 @@ +--- +"@medusajs/medusa": patch +"@medusajs/core-flows": patch +"@medusajs/js-sdk": patch +"integration-tests-http": patch +--- + +feat: process import from pre-processed chunks diff --git a/integration-tests/http/__tests__/product/admin/product-export.spec.ts b/integration-tests/http/__tests__/product/admin/product-export.spec.ts index bab6279ee7..292c8adb3e 100644 --- a/integration-tests/http/__tests__/product/admin/product-export.spec.ts +++ b/integration-tests/http/__tests__/product/admin/product-export.spec.ts @@ -1,5 +1,6 @@ import { IEventBusModuleService } from "@medusajs/types" import { CommonEvents, Modules } from "@medusajs/utils" +import os from "os" import fs from "fs/promises" import { TestEventUtils, @@ -16,7 +17,7 @@ import { csv2json } from "json-2-csv" jest.setTimeout(50000) const getCSVContents = async (filePath: string) => { - const asLocalPath = filePath.replace("http://localhost:9000", process.cwd()) + const asLocalPath = filePath.replace("http://localhost:9000", os.tmpdir()) const fileContent = await fs.readFile(asLocalPath, { encoding: "utf-8" }) await fs.rm(path.dirname(asLocalPath), { recursive: true, force: true }) const csvRows = csv2json(fileContent) diff --git a/integration-tests/http/__tests__/product/admin/product-import.spec.ts b/integration-tests/http/__tests__/product/admin/product-import.spec.ts index 53006ac36d..297916ec2f 100644 --- a/integration-tests/http/__tests__/product/admin/product-import.spec.ts +++ b/integration-tests/http/__tests__/product/admin/product-import.spec.ts @@ -55,7 +55,6 @@ function prepareCSVForImport(fileContents: string, delimiter: string = ",") { } medusaIntegrationTestRunner({ - dbName: "bulk-uploads-local", testSuite: ({ dbConnection, getContainer, api }) => { let baseCollection let baseType diff --git a/integration-tests/http/__tests__/product/admin/product-imports.spec.ts b/integration-tests/http/__tests__/product/admin/product-imports.spec.ts new file mode 100644 index 0000000000..6a19814b71 --- /dev/null +++ b/integration-tests/http/__tests__/product/admin/product-imports.spec.ts @@ -0,0 +1,610 @@ +import fs from "fs/promises" +import path, { extname } from "path" +import { csv2json, json2csv } from "json-2-csv" +import { CommonEvents, Modules } from "@medusajs/utils" +import { IEventBusModuleService, IFileModuleService } from "@medusajs/types" +import { + TestEventUtils, + medusaIntegrationTestRunner, +} from "@medusajs/test-utils" +import { + adminHeaders, + createAdminUser, +} from "../../../../helpers/create-admin-user" +import { getProductFixture } from "../../../../helpers/fixtures" + +const UNALLOWED_EXPORTED_COLUMNS = [ + "Product Is Giftcard", + "Product Created At", + "Product Updated At", + "Product Deleted At", + "Variant Product Id", + "Variant Created At", + "Variant Updated At", + "Variant Deleted At", +] + +jest.setTimeout(50000) + +const getUploadReq = (file: { key: string; name: string; size: number }) => { + return { + body: { + file_key: file.key, + originalname: file.name, + extension: extname(file.name), + size: file.size, + mime_type: "text/csv", + }, + meta: { + headers: { + ...adminHeaders.headers, + }, + }, + } +} + +function prepareCSVForImport(fileContents: string, delimiter: string = ",") { + const CSVFileAsJSON = csv2json(fileContents, { + delimiter: { field: delimiter }, + }) + CSVFileAsJSON.forEach((row) => { + UNALLOWED_EXPORTED_COLUMNS.forEach((col) => { + delete row[col] + }) + }) + + return json2csv(CSVFileAsJSON) +} + +medusaIntegrationTestRunner({ + testSuite: ({ dbConnection, getContainer, api }) => { + let baseCollection + let baseType + let baseProduct + let baseRegion + let baseCategory + let baseTag1 + let baseTag2 + let baseTag3 + let newTag + let shippingProfile + + let eventBus: IEventBusModuleService + let fileModule: IFileModuleService + beforeAll(async () => { + eventBus = getContainer().resolve(Modules.EVENT_BUS) + fileModule = getContainer().resolve(Modules.FILE) + }) + + beforeEach(async () => { + await createAdminUser(dbConnection, adminHeaders, getContainer()) + baseCollection = ( + await api.post( + "/admin/collections", + { title: "base-collection" }, + adminHeaders + ) + ).data.collection + + baseType = ( + await api.post( + "/admin/product-types", + { value: "test-type" }, + adminHeaders + ) + ).data.product_type + + baseTag1 = ( + await api.post( + "/admin/product-tags", + { value: "tag-123" }, + adminHeaders + ) + ).data.product_tag + + baseTag2 = ( + await api.post( + "/admin/product-tags", + { value: "tag-123_1" }, + adminHeaders + ) + ).data.product_tag + + baseTag3 = ( + await api.post( + "/admin/product-tags", + { value: "tag-456" }, + adminHeaders + ) + ).data.product_tag + + newTag = ( + await api.post( + "/admin/product-tags", + { value: "new-tag" }, + adminHeaders + ) + ).data.product_tag + + shippingProfile = ( + await api.post( + `/admin/shipping-profiles`, + { name: "Test", type: "default" }, + adminHeaders + ) + ).data.shipping_profile + + baseProduct = ( + await api.post( + "/admin/products", + getProductFixture({ + title: "Base product", + tags: [{ id: baseTag1.id }, { id: baseTag2.id }], + shipping_profile_id: shippingProfile.id, + }), + adminHeaders + ) + ).data.product + + baseRegion = ( + await api.post( + "/admin/regions", + { + name: "Test region", + currency_code: "USD", + }, + adminHeaders + ) + ).data.region + + baseCategory = ( + await api.post( + "/admin/product-categories", + { name: "Test", is_internal: false, is_active: true }, + adminHeaders + ) + ).data.product_category + }) + + afterEach(() => { + ;(eventBus as any).eventEmitter_.removeAllListeners() + }) + + describe("POST /admin/products/imports", () => { + // We want to ensure files with different delimiters are supported + ;[ + { + file: "products-comma.csv", + name: "delimited with comma", + delimiter: ",", + }, + { + file: "products-semicolon.csv", + name: "delimited with semicolon", + delimiter: ";", + }, + ].forEach((testcase) => { + it(`should import a previously exported products CSV file ${testcase.name}`, async () => { + const subscriberExecution = TestEventUtils.waitSubscribersExecution( + `${Modules.NOTIFICATION}.notification.${CommonEvents.CREATED}`, + eventBus + ) + + let fileContent = await fs.readFile( + path.join(__dirname, "__fixtures__", testcase.file), + { encoding: "utf-8" } + ) + + fileContent = fileContent.replace( + /prod_01J44RRJZ3M5F63NY82434RNM5/g, + baseProduct.id + ) + fileContent = fileContent.replace( + /variant_01J44RRJZW1T9KQB6XG7Q6K61F/g, + baseProduct.variants[0].id + ) + + fileContent = fileContent.replace(/pcol_\w*\d*/g, baseCollection.id) + fileContent = fileContent.replace(/ptyp_\w*\d*/g, baseType.id) + fileContent = fileContent.replace(/tag-123/g, baseTag1.id) + fileContent = fileContent.replace(/tag-456/g, baseTag3.id) + fileContent = fileContent.replace(/new-tag/g, newTag.id) + + fileContent = fileContent.replace( + /import-shipping-profile*/g, + shippingProfile.id + ) + + const csvContents = prepareCSVForImport( + fileContent, + testcase.delimiter + ) + const { id } = await fileModule.createFiles({ + filename: "test.csv", + content: csvContents, + mimeType: "text/csv", + }) + + const { body, meta } = getUploadReq({ + name: "test.csv", + key: id, + size: csvContents.length, + }) + + // BREAKING: The batch endpoints moved to the domain routes (admin/batch-jobs -> /admin/products/import). The payload and response changed as well. + const batchJobRes = await api.post( + "/admin/products/imports", + body, + meta + ) + + const transactionId = batchJobRes.data.transaction_id + expect(transactionId).toBeTruthy() + expect(batchJobRes.data.summary).toEqual({ + toCreate: 1, + toUpdate: 1, + }) + + await api.post( + `/admin/products/imports/${transactionId}/confirm`, + {}, + meta + ) + + await subscriberExecution + const notifications = ( + await api.get("/admin/notifications", adminHeaders) + ).data.notifications + + expect(notifications.length).toBe(1) + expect(notifications[0]).toEqual( + expect.objectContaining({ + data: expect.objectContaining({ + title: "Product import", + description: `Product import of file test.csv completed successfully!`, + }), + }) + ) + + const dbProducts = (await api.get("/admin/products", adminHeaders)) + .data.products + + expect(dbProducts).toHaveLength(2) + expect(dbProducts[0]).toEqual( + expect.objectContaining({ + id: baseProduct.id, + handle: "base-product", + is_giftcard: false, + thumbnail: "test-image.png", + status: "draft", + description: "test-product-description\ntest line 2", + options: expect.arrayContaining([ + expect.objectContaining({ + title: "size", + values: expect.arrayContaining([ + expect.objectContaining({ + value: "large", + }), + expect.objectContaining({ + value: "small", + }), + ]), + }), + expect.objectContaining({ + title: "color", + values: expect.arrayContaining([ + expect.objectContaining({ + value: "green", + }), + ]), + }), + ]), + images: expect.arrayContaining([ + expect.objectContaining({ + url: "test-image.png", + }), + expect.objectContaining({ + url: "test-image-2.png", + }), + ]), + tags: [ + expect.objectContaining({ + id: baseTag1.id, + }), + expect.objectContaining({ + id: baseTag3.id, + }), + ], + type: expect.objectContaining({ + id: baseType.id, + }), + collection: expect.objectContaining({ + id: baseCollection.id, + }), + variants: expect.arrayContaining([ + expect.objectContaining({ + title: "Test variant", + allow_backorder: false, + manage_inventory: true, + prices: expect.arrayContaining([ + expect.objectContaining({ + currency_code: "dkk", + amount: 30, + }), + expect.objectContaining({ + currency_code: "eur", + amount: 45, + }), + expect.objectContaining({ + currency_code: "usd", + amount: 100, + }), + ]), + options: expect.arrayContaining([ + expect.objectContaining({ + value: "large", + }), + expect.objectContaining({ + value: "green", + }), + ]), + }), + expect.objectContaining({ + title: "Test variant 2", + allow_backorder: false, + manage_inventory: true, + prices: expect.arrayContaining([ + expect.objectContaining({ + currency_code: "dkk", + amount: 50, + }), + expect.objectContaining({ + currency_code: "eur", + amount: 65, + }), + expect.objectContaining({ + currency_code: "usd", + amount: 200, + }), + ]), + options: expect.arrayContaining([ + expect.objectContaining({ + value: "small", + }), + expect.objectContaining({ + value: "green", + }), + ]), + }), + ]), + created_at: expect.any(String), + updated_at: expect.any(String), + }) + ) + + expect(dbProducts[1]).toEqual( + expect.objectContaining({ + id: expect.any(String), + handle: "proposed-product", + is_giftcard: false, + thumbnail: "test-image.png", + status: "proposed", + description: "test-product-description", + options: expect.arrayContaining([ + expect.objectContaining({ + title: "size", + values: expect.arrayContaining([ + expect.objectContaining({ + value: "large", + }), + ]), + }), + expect.objectContaining({ + title: "color", + values: expect.arrayContaining([ + expect.objectContaining({ + value: "green", + }), + ]), + }), + ]), + images: expect.arrayContaining([ + expect.objectContaining({ + url: "test-image.png", + }), + expect.objectContaining({ + url: "test-image-2.png", + }), + ]), + tags: [ + expect.objectContaining({ + value: "new-tag", + }), + ], + type: expect.objectContaining({ + id: baseType.id, + }), + collection: null, + variants: expect.arrayContaining([ + expect.objectContaining({ + title: "Test variant", + allow_backorder: false, + manage_inventory: true, + prices: expect.arrayContaining([ + expect.objectContaining({ + currency_code: "dkk", + amount: 30, + }), + expect.objectContaining({ + currency_code: "eur", + amount: 45, + }), + expect.objectContaining({ + currency_code: "usd", + amount: 100, + }), + ]), + options: expect.arrayContaining([ + expect.objectContaining({ + value: "large", + }), + expect.objectContaining({ + value: "green", + }), + ]), + }), + ]), + created_at: expect.any(String), + updated_at: expect.any(String), + }) + ) + }) + }) + + it("should import product with categories", async () => { + const subscriberExecution = TestEventUtils.waitSubscribersExecution( + `${Modules.NOTIFICATION}.notification.${CommonEvents.CREATED}`, + eventBus + ) + + let fileContent = await fs.readFile( + path.join(__dirname, "__fixtures__", "product-with-categories.csv"), + { encoding: "utf-8" } + ) + + fileContent = fileContent.replace(/prod_\w*\d*/g, baseProduct.id) + fileContent = fileContent.replace(/pcol_\w*\d*/g, baseCollection.id) + fileContent = fileContent.replace(/ptyp_\w*\d*/g, baseType.id) + fileContent = fileContent.replace(/pcat_\w*\d*/g, baseCategory.id) + fileContent = fileContent.replace(/tag-123/g, baseTag1.id) + fileContent = fileContent.replace(/tag-456/g, baseTag3.id) + fileContent = fileContent.replace(/new-tag/g, newTag.id) + + fileContent = fileContent.replace( + /import-shipping-profile*/g, + shippingProfile.id + ) + + const csvContents = prepareCSVForImport(fileContent, ",") + const { id } = await fileModule.createFiles({ + filename: "test.csv", + content: csvContents, + mimeType: "text/csv", + }) + + const { body, meta } = getUploadReq({ + name: "test.csv", + key: id, + size: csvContents.length, + }) + + const batchJobRes = await api.post( + "/admin/products/imports", + body, + meta + ) + + const transactionId = batchJobRes.data.transaction_id + expect(transactionId).toBeTruthy() + expect(batchJobRes.data.summary).toEqual({ + toCreate: 0, + toUpdate: 1, + }) + + await api.post( + `/admin/products/imports/${transactionId}/confirm`, + {}, + meta + ) + + await subscriberExecution + const dbProducts = ( + await api.get("/admin/products?fields=*categories", adminHeaders) + ).data.products + + expect(dbProducts).toHaveLength(1) + expect(dbProducts[0]).toEqual( + expect.objectContaining({ + id: baseProduct.id, + categories: [expect.objectContaining({ id: baseCategory.id })], + }) + ) + }) + + it("should complain about non-existent fields being present in the CSV", async () => { + let fileContent = await fs.readFile( + path.join(__dirname, "__fixtures__", "unrelated-column.csv"), + { encoding: "utf-8" } + ) + + fileContent = fileContent.replace(/pcol_\w*\d*/g, baseCollection.id) + fileContent = fileContent.replace(/ptyp_\w*\d*/g, baseType.id) + fileContent = fileContent.replace(/tag-123/g, baseTag1.id) + fileContent = fileContent.replace(/tag-456/g, baseTag3.id) + fileContent = fileContent.replace(/new-tag/g, newTag.id) + + fileContent = fileContent.replace( + /import-shipping-profile*/g, + shippingProfile.id + ) + + const csvContents = prepareCSVForImport(fileContent, ",") + const { id } = await fileModule.createFiles({ + filename: "test.csv", + content: csvContents, + mimeType: "text/csv", + }) + + const { body, meta } = getUploadReq({ + name: "test.csv", + key: id, + size: csvContents.length, + }) + + const batchJobRes = await api + .post("/admin/products/imports", body, meta) + .catch((e) => e) + + expect(batchJobRes.response.data.message).toEqual( + 'Invalid column name(s) "Some field"' + ) + }) + + it("should successfully skip non-existent product fields being present in the CSV", async () => { + let fileContent = await fs.readFile( + path.join(__dirname, "__fixtures__", "invalid-column.csv"), + { encoding: "utf-8" } + ) + + fileContent = fileContent.replace(/pcol_\w*\d*/g, baseCollection.id) + fileContent = fileContent.replace(/ptyp_\w*\d*/g, baseType.id) + + fileContent = fileContent.replace( + /import-shipping-profile*/g, + shippingProfile.id + ) + + const csvContents = prepareCSVForImport(fileContent, ",") + const { id } = await fileModule.createFiles({ + filename: "test.csv", + content: csvContents, + mimeType: "text/csv", + }) + + const { body, meta } = getUploadReq({ + name: "test.csv", + key: id, + size: csvContents.length, + }) + + const batchJobRes = await api + .post("/admin/products/imports", body, meta) + .catch((e) => e) + + expect(batchJobRes.response.data.message).toEqual( + 'Invalid column name(s) "Product field"' + ) + }) + }) + }, +}) diff --git a/integration-tests/http/medusa-config.js b/integration-tests/http/medusa-config.js index 27673687d5..37cd6baf2d 100644 --- a/integration-tests/http/medusa-config.js +++ b/integration-tests/http/medusa-config.js @@ -65,6 +65,7 @@ module.exports = defineConfig({ options: { // This is the directory where we can reliably write in CI environments upload_dir: path.join(os.tmpdir(), "uploads"), + private_upload_dir: path.join(os.tmpdir(), "static"), }, }, ], diff --git a/packages/core/core-flows/src/product/steps/index.ts b/packages/core/core-flows/src/product/steps/index.ts index 9f299c60b6..eb4cde0156 100644 --- a/packages/core/core-flows/src/product/steps/index.ts +++ b/packages/core/core-flows/src/product/steps/index.ts @@ -26,3 +26,5 @@ export * from "./parse-product-csv" export * from "./wait-confirmation-product-import" export * from "./get-variant-availability" export * from "./normalize-products" +export * from "./normalize-products-to-chunks" +export * from "./process-import-chunks" diff --git a/packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts b/packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts new file mode 100644 index 0000000000..749996405f --- /dev/null +++ b/packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts @@ -0,0 +1,71 @@ +import { HttpTypes } from "@medusajs/framework/types" +import { + CSVNormalizer, + Modules, + productValidators, +} from "@medusajs/framework/utils" +import { StepResponse, createStep } from "@medusajs/framework/workflows-sdk" +import { convertCsvToJson } from "../utils" + +/** + * The CSV file content to parse. + */ +export type NormalizeProductCsvV1StepInput = string + +export const normalizeCsvToChunksStepId = "normalize-product-csv-to-chunks" + +/** + * This step parses a CSV file holding products to import, returning the chunks + * to be processed. Each chunk is written to a file using the file provider. + * + * @example + * const data = normalizeCsvToChunksStep("products.csv") + */ +export const normalizeCsvToChunksStep = createStep( + normalizeCsvToChunksStepId, + async (fileKey: NormalizeProductCsvV1StepInput, { container }) => { + const file = container.resolve(Modules.FILE) + const contents = await file.getAsBuffer(fileKey) + + const csvProducts = convertCsvToJson< + ConstructorParameters[0][0] + >(contents.toString("utf-8")) + + const normalizer = new CSVNormalizer(csvProducts) + const products = normalizer.proccess() + + const create = Object.keys(products.toCreate).reduce< + HttpTypes.AdminCreateProduct[] + >((result, toCreateHandle) => { + result.push( + productValidators.CreateProduct.parse( + products.toCreate[toCreateHandle] + ) as HttpTypes.AdminCreateProduct + ) + return result + }, []) + + const update = Object.keys(products.toUpdate).reduce< + HttpTypes.AdminUpdateProduct & { id: string }[] + >((result, toUpdateId) => { + result.push( + productValidators.UpdateProduct.parse(products.toUpdate[toUpdateId]) + ) + return result + }, []) + + const { id } = await file.createFiles({ + filename: `${fileKey}.json`, + content: JSON.stringify({ create, update }), + mimeType: "application/json", + }) + + return new StepResponse({ + chunks: [id], + summary: { + toCreate: create.length, + toUpdate: update.length, + }, + }) + } +) diff --git a/packages/core/core-flows/src/product/steps/process-import-chunks.ts b/packages/core/core-flows/src/product/steps/process-import-chunks.ts new file mode 100644 index 0000000000..be3907c999 --- /dev/null +++ b/packages/core/core-flows/src/product/steps/process-import-chunks.ts @@ -0,0 +1,28 @@ +import { Modules } from "@medusajs/framework/utils" +import { createStep, StepResponse } from "@medusajs/framework/workflows-sdk" +import { batchProductsWorkflow } from "../workflows/batch-products" + +export const processImportChunksStepId = "process-import-chunks" + +/** + * This step parses a CSV file holding products to import, returning the products as + * objects that can be imported. + * + * @example + * const data = parseProductCsvStep("products.csv") + */ +export const processImportChunksStep = createStep( + processImportChunksStepId, + async (input: { chunks: string[] }, { container }) => { + const file = container.resolve(Modules.FILE) + + for (let chunk of input.chunks) { + const contents = await file.getAsBuffer(chunk) + await batchProductsWorkflow(container).run({ + input: JSON.parse(contents.toString("utf-8")), + }) + } + + return new StepResponse({ completed: true }) + } +) diff --git a/packages/core/core-flows/src/product/workflows/import-products-as-chunks.ts b/packages/core/core-flows/src/product/workflows/import-products-as-chunks.ts new file mode 100644 index 0000000000..ee0a34cc04 --- /dev/null +++ b/packages/core/core-flows/src/product/workflows/import-products-as-chunks.ts @@ -0,0 +1,137 @@ +import { WorkflowTypes } from "@medusajs/framework/types" +import { + WorkflowData, + WorkflowResponse, + createWorkflow, + transform, +} from "@medusajs/framework/workflows-sdk" +import { notifyOnFailureStep, sendNotificationsStep } from "../../notification" +import { + normalizeCsvToChunksStep, + processImportChunksStep, + waitConfirmationProductImportStep, +} from "../steps" + +export const importProductsAsChunksWorkflowId = "import-products-as-chunks" + +/** + * This workflow starts a product import from a CSV file in the background. It's used by the + * [Import Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimport). + * + * You can use this workflow within your custom workflows, allowing you to wrap custom logic around product import. + * For example, you can import products from another system. + * + * The workflow only starts the import, but you'll have to confirm it using the [Workflow Engine](https://docs.medusajs.com/resources/infrastructure-modules/workflow-engine). + * The below example shows how to confirm the import. + * + * @example + * To start the import of a CSV file: + * + * ```ts + * const { result, transaction } = await importProductsAsChunksWorkflow(container) + * .run({ + * input: { + * filename: "products.csv", + * fileKey: "products.csv", + * } + * }) + * ``` + * + * Notice that the workflow returns a `transaction.transactionId`. You'll use this ID to confirm the import afterwards. + * + * You confirm the import using the [Workflow Engine](https://docs.medusajs.com/resources/infrastructure-modules/workflow-engine). + * For example, in an API route: + * + * ```ts workflow={false} + * import { + * AuthenticatedMedusaRequest, + * MedusaResponse, + * } from "@medusajs/framework/http" + * import { + * importProductsAsChunksWorkflowId, + * waitConfirmationProductImportStepId, + * } from "@medusajs/core-flows" + * import { IWorkflowEngineService } from "@medusajs/framework/types" + * import { Modules, TransactionHandlerType } from "@medusajs/framework/utils" + * import { StepResponse } from "@medusajs/framework/workflows-sdk" + * + * export const POST = async ( + * req: AuthenticatedMedusaRequest, + * res: MedusaResponse + * ) => { + * const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + * Modules.WORKFLOW_ENGINE + * ) + * const transactionId = req.params.transaction_id + * + * await workflowEngineService.setStepSuccess({ + * idempotencyKey: { + * action: TransactionHandlerType.INVOKE, + * transactionId, + * stepId: waitConfirmationProductImportStepId, + * workflowId: importProductsAsChunksWorkflowId, + * }, + * stepResponse: new StepResponse(true), + * }) + * + * res.status(202).json({}) + * } + * ``` + * + * :::tip + * + * This example API route uses the same implementation as the [Confirm Product Import Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimportstransaction_idconfirm). + * + * ::: + * + * @summary + * + * Import products from a CSV file. + */ +export const importProductsAsChunksWorkflow = createWorkflow( + importProductsAsChunksWorkflowId, + ( + input: WorkflowData<{ fileKey: string; filename: string }> + ): WorkflowResponse => { + const batchRequest = normalizeCsvToChunksStep(input.fileKey) + + waitConfirmationProductImportStep() + + // Q: Can we somehow access the error from the step that threw here? Or in a compensate step at least? + const failureNotification = transform({ input }, (data) => { + return [ + { + // We don't need the recipient here for now, but if we want to push feed notifications to a specific user we could add it. + to: "", + channel: "feed", + template: "admin-ui", + data: { + title: "Product import", + description: `Failed to import products from file ${data.input.filename}`, + }, + }, + ] + }) + + notifyOnFailureStep(failureNotification) + processImportChunksStep(batchRequest) + + const notifications = transform({ input }, (data) => { + return [ + { + // We don't need the recipient here for now, but if we want to push feed notifications to a specific user we could add it. + to: "", + channel: "feed", + template: "admin-ui", + data: { + title: "Product import", + description: `Product import of file ${data.input.filename} completed successfully!`, + }, + }, + ] + }) + + sendNotificationsStep(notifications) + return new WorkflowResponse(batchRequest.summary) + } +) diff --git a/packages/core/core-flows/src/product/workflows/index.ts b/packages/core/core-flows/src/product/workflows/index.ts index 1eba8091fb..a6ad08bc98 100644 --- a/packages/core/core-flows/src/product/workflows/index.ts +++ b/packages/core/core-flows/src/product/workflows/index.ts @@ -22,4 +22,5 @@ export * from "./update-product-variants" export * from "./update-products" export * from "./export-products" export * from "./import-products" +export * from "./import-products-as-chunks" export * from "./upsert-variant-prices" diff --git a/packages/core/js-sdk/src/admin/product.ts b/packages/core/js-sdk/src/admin/product.ts index 9e3969ba13..2ce5955963 100644 --- a/packages/core/js-sdk/src/admin/product.ts +++ b/packages/core/js-sdk/src/admin/product.ts @@ -65,7 +65,7 @@ export class Product { * This method sends a request to the * [Create Product Import](https://docs.medusajs.com/api/admin#products_postproductsimports) * API route. - * + * * @version 2.8.0 * @ignore * @privateRemarks diff --git a/packages/medusa/src/api/admin/products/imports/[transaction_id]/confirm/route.ts b/packages/medusa/src/api/admin/products/imports/[transaction_id]/confirm/route.ts index cc05a745d4..fb0bf9e881 100644 --- a/packages/medusa/src/api/admin/products/imports/[transaction_id]/confirm/route.ts +++ b/packages/medusa/src/api/admin/products/imports/[transaction_id]/confirm/route.ts @@ -4,7 +4,7 @@ import { } from "@medusajs/framework/http" import { - importProductsWorkflowId, + importProductsAsChunksWorkflowId, waitConfirmationProductImportStepId, } from "@medusajs/core-flows" import { IWorkflowEngineService } from "@medusajs/framework/types" @@ -28,7 +28,7 @@ export const POST = async ( action: TransactionHandlerType.INVOKE, transactionId, stepId: waitConfirmationProductImportStepId, - workflowId: importProductsWorkflowId, + workflowId: importProductsAsChunksWorkflowId, }, stepResponse: new StepResponse(true), }) diff --git a/packages/medusa/src/api/admin/products/imports/route.ts b/packages/medusa/src/api/admin/products/imports/route.ts index c43fbe31f4..264e1a8c38 100644 --- a/packages/medusa/src/api/admin/products/imports/route.ts +++ b/packages/medusa/src/api/admin/products/imports/route.ts @@ -2,10 +2,9 @@ import { MedusaResponse, AuthenticatedMedusaRequest, } from "@medusajs/framework/http" -import { Modules } from "@medusajs/framework/utils" import type { HttpTypes } from "@medusajs/framework/types" -import { importProductsWorkflow } from "@medusajs/core-flows" import type { AdminImportProductsType } from "../validators" +import { importProductsAsChunksWorkflow } from "@medusajs/core-flows" /** * @version 2.8.0 @@ -14,13 +13,12 @@ export const POST = async ( req: AuthenticatedMedusaRequest, res: MedusaResponse ) => { - const fileProvider = req.scope.resolve(Modules.FILE) - const file = await fileProvider.getAsBuffer(req.validatedBody.file_key) - - const { result, transaction } = await importProductsWorkflow(req.scope).run({ + const { result, transaction } = await importProductsAsChunksWorkflow( + req.scope + ).run({ input: { filename: req.validatedBody.originalname, - fileContent: file.toString("utf-8"), + fileKey: req.validatedBody.file_key, }, })