feat: process import from pre-processed chunks (#12527)

Fixes: FRMW-2968

In this PR we have done two major things.

- First, we remove storing CSV contents within the workflow storage and neither store the JSON payloads to be created/updated in workflows. Earlier, they all were workflow inputs, hence were stored in the workflow
- Introduce a naive concept of chunks and process chunks one by one. The next PR making chunking a bit more robust while using streams, adding ability to resume from the failed chunk and so on.

> [!IMPORTANT]  
> The new endpoint `/admin/product/imports` is not in use yet. But it will be after the next (final) PR.

## Old context in workflow storage

![CleanShot 2025-05-19 at 17 11 08@2x](https://github.com/user-attachments/assets/798bdcc9-a368-4c1f-afdd-2a77f5ce43e0)

## New context in workflow storage

![CleanShot 2025-05-19 at 17 15 08@2x](https://github.com/user-attachments/assets/0463d035-403f-4600-a9cd-5af24d5fee7c)
This commit is contained in:
Harminder Virk
2025-05-20 18:03:18 +05:30
committed by GitHub
parent 3e5794d657
commit fca5ad77b4
13 changed files with 868 additions and 12 deletions

View File

@@ -26,3 +26,5 @@ export * from "./parse-product-csv"
export * from "./wait-confirmation-product-import"
export * from "./get-variant-availability"
export * from "./normalize-products"
export * from "./normalize-products-to-chunks"
export * from "./process-import-chunks"

View File

@@ -0,0 +1,71 @@
import { HttpTypes } from "@medusajs/framework/types"
import {
CSVNormalizer,
Modules,
productValidators,
} from "@medusajs/framework/utils"
import { StepResponse, createStep } from "@medusajs/framework/workflows-sdk"
import { convertCsvToJson } from "../utils"
/**
* The CSV file content to parse.
*/
export type NormalizeProductCsvV1StepInput = string
export const normalizeCsvToChunksStepId = "normalize-product-csv-to-chunks"
/**
* This step parses a CSV file holding products to import, returning the chunks
* to be processed. Each chunk is written to a file using the file provider.
*
* @example
* const data = normalizeCsvToChunksStep("products.csv")
*/
export const normalizeCsvToChunksStep = createStep(
normalizeCsvToChunksStepId,
async (fileKey: NormalizeProductCsvV1StepInput, { container }) => {
const file = container.resolve(Modules.FILE)
const contents = await file.getAsBuffer(fileKey)
const csvProducts = convertCsvToJson<
ConstructorParameters<typeof CSVNormalizer>[0][0]
>(contents.toString("utf-8"))
const normalizer = new CSVNormalizer(csvProducts)
const products = normalizer.proccess()
const create = Object.keys(products.toCreate).reduce<
HttpTypes.AdminCreateProduct[]
>((result, toCreateHandle) => {
result.push(
productValidators.CreateProduct.parse(
products.toCreate[toCreateHandle]
) as HttpTypes.AdminCreateProduct
)
return result
}, [])
const update = Object.keys(products.toUpdate).reduce<
HttpTypes.AdminUpdateProduct & { id: string }[]
>((result, toUpdateId) => {
result.push(
productValidators.UpdateProduct.parse(products.toUpdate[toUpdateId])
)
return result
}, [])
const { id } = await file.createFiles({
filename: `${fileKey}.json`,
content: JSON.stringify({ create, update }),
mimeType: "application/json",
})
return new StepResponse({
chunks: [id],
summary: {
toCreate: create.length,
toUpdate: update.length,
},
})
}
)

View File

@@ -0,0 +1,28 @@
import { Modules } from "@medusajs/framework/utils"
import { createStep, StepResponse } from "@medusajs/framework/workflows-sdk"
import { batchProductsWorkflow } from "../workflows/batch-products"
export const processImportChunksStepId = "process-import-chunks"
/**
* This step parses a CSV file holding products to import, returning the products as
* objects that can be imported.
*
* @example
* const data = parseProductCsvStep("products.csv")
*/
export const processImportChunksStep = createStep(
processImportChunksStepId,
async (input: { chunks: string[] }, { container }) => {
const file = container.resolve(Modules.FILE)
for (let chunk of input.chunks) {
const contents = await file.getAsBuffer(chunk)
await batchProductsWorkflow(container).run({
input: JSON.parse(contents.toString("utf-8")),
})
}
return new StepResponse({ completed: true })
}
)

View File

@@ -0,0 +1,137 @@
import { WorkflowTypes } from "@medusajs/framework/types"
import {
WorkflowData,
WorkflowResponse,
createWorkflow,
transform,
} from "@medusajs/framework/workflows-sdk"
import { notifyOnFailureStep, sendNotificationsStep } from "../../notification"
import {
normalizeCsvToChunksStep,
processImportChunksStep,
waitConfirmationProductImportStep,
} from "../steps"
export const importProductsAsChunksWorkflowId = "import-products-as-chunks"
/**
* This workflow starts a product import from a CSV file in the background. It's used by the
* [Import Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimport).
*
* You can use this workflow within your custom workflows, allowing you to wrap custom logic around product import.
* For example, you can import products from another system.
*
* The workflow only starts the import, but you'll have to confirm it using the [Workflow Engine](https://docs.medusajs.com/resources/infrastructure-modules/workflow-engine).
* The below example shows how to confirm the import.
*
* @example
* To start the import of a CSV file:
*
* ```ts
* const { result, transaction } = await importProductsAsChunksWorkflow(container)
* .run({
* input: {
* filename: "products.csv",
* fileKey: "products.csv",
* }
* })
* ```
*
* Notice that the workflow returns a `transaction.transactionId`. You'll use this ID to confirm the import afterwards.
*
* You confirm the import using the [Workflow Engine](https://docs.medusajs.com/resources/infrastructure-modules/workflow-engine).
* For example, in an API route:
*
* ```ts workflow={false}
* import {
* AuthenticatedMedusaRequest,
* MedusaResponse,
* } from "@medusajs/framework/http"
* import {
* importProductsAsChunksWorkflowId,
* waitConfirmationProductImportStepId,
* } from "@medusajs/core-flows"
* import { IWorkflowEngineService } from "@medusajs/framework/types"
* import { Modules, TransactionHandlerType } from "@medusajs/framework/utils"
* import { StepResponse } from "@medusajs/framework/workflows-sdk"
*
* export const POST = async (
* req: AuthenticatedMedusaRequest,
* res: MedusaResponse
* ) => {
* const workflowEngineService: IWorkflowEngineService = req.scope.resolve(
* Modules.WORKFLOW_ENGINE
* )
* const transactionId = req.params.transaction_id
*
* await workflowEngineService.setStepSuccess({
* idempotencyKey: {
* action: TransactionHandlerType.INVOKE,
* transactionId,
* stepId: waitConfirmationProductImportStepId,
* workflowId: importProductsAsChunksWorkflowId,
* },
* stepResponse: new StepResponse(true),
* })
*
* res.status(202).json({})
* }
* ```
*
* :::tip
*
* This example API route uses the same implementation as the [Confirm Product Import Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsimportstransaction_idconfirm).
*
* :::
*
* @summary
*
* Import products from a CSV file.
*/
export const importProductsAsChunksWorkflow = createWorkflow(
importProductsAsChunksWorkflowId,
(
input: WorkflowData<{ fileKey: string; filename: string }>
): WorkflowResponse<WorkflowTypes.ProductWorkflow.ImportProductsSummary> => {
const batchRequest = normalizeCsvToChunksStep(input.fileKey)
waitConfirmationProductImportStep()
// Q: Can we somehow access the error from the step that threw here? Or in a compensate step at least?
const failureNotification = transform({ input }, (data) => {
return [
{
// We don't need the recipient here for now, but if we want to push feed notifications to a specific user we could add it.
to: "",
channel: "feed",
template: "admin-ui",
data: {
title: "Product import",
description: `Failed to import products from file ${data.input.filename}`,
},
},
]
})
notifyOnFailureStep(failureNotification)
processImportChunksStep(batchRequest)
const notifications = transform({ input }, (data) => {
return [
{
// We don't need the recipient here for now, but if we want to push feed notifications to a specific user we could add it.
to: "",
channel: "feed",
template: "admin-ui",
data: {
title: "Product import",
description: `Product import of file ${data.input.filename} completed successfully!`,
},
},
]
})
sendNotificationsStep(notifications)
return new WorkflowResponse(batchRequest.summary)
}
)

View File

@@ -22,4 +22,5 @@ export * from "./update-product-variants"
export * from "./update-products"
export * from "./export-products"
export * from "./import-products"
export * from "./import-products-as-chunks"
export * from "./upsert-variant-prices"