feat: implement stream based processing of the files (#12574)

Fixes: FRMW-2960

This PR adds support for processing large CSV files by breaking them into chunks and processing one chunk at a time. This is how it works in nutshell.

- The CSV file is read as a stream and each chunk of the stream is one CSV row.
- We read upto 1000 rows (plus a few more to ensure product variants of a product are not split into multiple chunks).
- Each chunk is then normalized using the `CSVNormalizer` and validated using zod schemas. If there is an error, the entire process will be aborted and the existing chunks will be deleted.
- Each chunk is written to a JSON file, so that we can process them later (after user confirms) without re-processing or validating the CSV file.
- The confirmation process will start consuming one chunk at a time and create/update products using the `batchProducts` workflow.

## Resume or not to resume processing of chunks

Let's imagine during processing of chunks, we find that chunk 3 leads to a database error. However, till this time we have processed the first two chunks already. How do we deal with this situation? Options are:

- We store at which chunk we failed and then during the re-upload we ignore chunks before the failed one. In my conversation with @olivermrbl we discovered that resuming will have to work with certain assumptions if we decide to implement it.
   - What if a user updates the CSV rows which are part of the already processed chunks? These changes will be ignored and they will never notice it.
   - Resuming works if the file name is still the same. What if they made changes and saved the file with "Save as - New name". In that case we will anyways process the entire file.
   - We will have to fetch the old workflow from the workflow engine using some `ilike` search, so that we can see at which chunk the last run failed for the given file.

Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com>
This commit is contained in:
Harminder Virk
2025-05-29 11:12:16 +05:30
committed by GitHub
parent 40e73c6ea2
commit cf0297f74a
12 changed files with 360 additions and 141 deletions

View File

@@ -1,11 +1,11 @@
import { HttpTypes } from "@medusajs/framework/types"
import { parse, Parser } from "csv-parse"
import { HttpTypes, IFileModuleService } from "@medusajs/framework/types"
import {
CSVNormalizer,
Modules,
CSVNormalizer,
productValidators,
} from "@medusajs/framework/utils"
import { StepResponse, createStep } from "@medusajs/framework/workflows-sdk"
import { convertCsvToJson } from "../utils"
/**
* The CSV file content to parse.
@@ -14,6 +14,174 @@ export type NormalizeProductCsvV1StepInput = string
export const normalizeCsvToChunksStepId = "normalize-product-csv-to-chunks"
/**
* Processes a chunk of products by writing them to a file. Later the
* file will be processed after the import has been confirmed.
*/
async function processChunk(
file: IFileModuleService,
fileKey: string,
csvRows: ReturnType<(typeof CSVNormalizer)["preProcess"]>[],
currentRowNumber: number
) {
const normalizer = new CSVNormalizer(csvRows)
const products = normalizer.proccess(currentRowNumber)
let create = Object.keys(products.toCreate).reduce<
HttpTypes.AdminCreateProduct[]
>((result, toCreateHandle) => {
result.push(
productValidators.CreateProduct.parse(
products.toCreate[toCreateHandle]
) as HttpTypes.AdminCreateProduct
)
return result
}, [])
let update = Object.keys(products.toUpdate).reduce<
HttpTypes.AdminUpdateProduct & { id: string }[]
>((result, toUpdateId) => {
result.push(
productValidators.UpdateProduct.parse(products.toUpdate[toUpdateId])
)
return result
}, [])
const toCreate = create.length
const toUpdate = update.length
const { id } = await file.createFiles({
filename: `${fileKey}.json`,
content: JSON.stringify({ create, update }),
mimeType: "application/json",
})
/**
* Release products from the memory
*/
create = []
update = []
return {
id,
toCreate,
toUpdate,
}
}
/**
* Creates chunks by reading CSV rows from the stream
*/
async function createChunks(
file: IFileModuleService,
fileKey: string,
stream: Parser
) {
/**
* The row under process
*/
let currentCSVRow = 0
/**
* Number of rows to process in a chunk. The rows count might go a little
* up if there are more rows for the same product.
*/
const rowsToRead = 1000
/**
* Current count of processed rows for a given chunk.
*/
let rowsReadSoFar = 0
/**
* Validated chunks that have been written with the file
* provider
*/
const chunks: { id: string; toCreate: number; toUpdate: number }[] = []
/**
* Currently collected rows to be processed as one chunk
*/
let rows: ReturnType<(typeof CSVNormalizer)["preProcess"]>[] = []
/**
* The unique value for the current row. We need this value to scan
* more rows after rowsToRead threshold has reached, but the upcoming
* rows are part of the same product.
*/
let currentRowUniqueValue: string | undefined
try {
for await (const row of stream) {
rowsReadSoFar++
currentCSVRow++
const normalizedRow = CSVNormalizer.preProcess(row, currentCSVRow)
const rowValueValue =
normalizedRow["product id"] || normalizedRow["product handle"]
/**
* Reached rows threshold
*/
if (rowsReadSoFar > rowsToRead) {
/**
* The current row unique value is not same as the previous row's
* unique value. Hence we can break the chunk here and process
* it.
*/
if (rowValueValue !== currentRowUniqueValue) {
chunks.push(
await processChunk(
file,
`${fileKey}-${chunks.length + 1}`,
rows,
currentCSVRow
)
)
/**
* Reset for new row
*/
rows = [normalizedRow]
rowsReadSoFar = 0
} else {
rows.push(normalizedRow)
}
} else {
rows.push(normalizedRow)
}
currentRowUniqueValue = rowValueValue
}
/**
* The file has finished and we have collected some rows that were
* under the chunk rows size threshold.
*/
if (rows.length) {
chunks.push(
await processChunk(
file,
`${fileKey}-${chunks.length + 1}`,
rows,
currentCSVRow
)
)
}
} catch (error) {
if (!stream.destroyed) {
stream.destroy()
}
/**
* Cleanup in case of an error
*/
await file.deleteFiles(chunks.map((chunk) => chunk.id).concat(fileKey))
throw error
}
return chunks
}
/**
* This step parses a CSV file holding products to import, returning the chunks
* to be processed. Each chunk is written to a file using the file provider.
@@ -25,47 +193,35 @@ export const normalizeCsvToChunksStep = createStep(
normalizeCsvToChunksStepId,
async (fileKey: NormalizeProductCsvV1StepInput, { container }) => {
const file = container.resolve(Modules.FILE)
const contents = await file.getAsBuffer(fileKey)
const csvProducts = convertCsvToJson<
ConstructorParameters<typeof CSVNormalizer>[0][0]
>(contents.toString("utf-8"))
const normalizer = new CSVNormalizer(csvProducts)
const products = normalizer.proccess()
const create = Object.keys(products.toCreate).reduce<
HttpTypes.AdminCreateProduct[]
>((result, toCreateHandle) => {
result.push(
productValidators.CreateProduct.parse(
products.toCreate[toCreateHandle]
) as HttpTypes.AdminCreateProduct
const contents = await file.getDownloadStream(fileKey)
const chunks = await createChunks(
file,
fileKey,
contents.pipe(
parse({
columns: true,
skip_empty_lines: true,
})
)
return result
}, [])
)
const update = Object.keys(products.toUpdate).reduce<
HttpTypes.AdminUpdateProduct & { id: string }[]
>((result, toUpdateId) => {
result.push(
productValidators.UpdateProduct.parse(products.toUpdate[toUpdateId])
)
return result
}, [])
const summary = chunks.reduce<{ toCreate: number; toUpdate: number }>(
(result, chunk) => {
result.toCreate = result.toCreate + chunk.toCreate
result.toUpdate = result.toUpdate + chunk.toUpdate
return result
},
{ toCreate: 0, toUpdate: 0 }
)
const { id } = await file.createFiles({
filename: `${fileKey}.json`,
content: JSON.stringify({ create, update }),
mimeType: "application/json",
})
/**
* Delete CSV file once we have the chunks
*/
await file.deleteFiles(fileKey)
return new StepResponse({
chunks: [id],
summary: {
toCreate: create.length,
toUpdate: update.length,
},
chunks,
summary,
})
}
)

View File

@@ -20,10 +20,10 @@ export const normalizeCsvStep = createStep(
normalizeCsvStepId,
async (fileContent: NormalizeProductCsvStepInput) => {
const csvProducts =
convertCsvToJson<ConstructorParameters<typeof CSVNormalizer>[0][0]>(
fileContent
)
const normalizer = new CSVNormalizer(csvProducts)
convertCsvToJson<Record<string, number | string | boolean>>(fileContent)
const normalizer = new CSVNormalizer(
csvProducts.map((row, index) => CSVNormalizer.preProcess(row, index + 1))
)
const products = normalizer.proccess()
const create = Object.keys(products.toCreate).reduce<

View File

@@ -13,14 +13,23 @@ export const processImportChunksStepId = "process-import-chunks"
*/
export const processImportChunksStep = createStep(
processImportChunksStepId,
async (input: { chunks: string[] }, { container }) => {
async (input: { chunks: { id: string }[] }, { container }) => {
const file = container.resolve(Modules.FILE)
for (let chunk of input.chunks) {
const contents = await file.getAsBuffer(chunk)
await batchProductsWorkflow(container).run({
input: JSON.parse(contents.toString("utf-8")),
})
try {
for (let chunk of input.chunks) {
const contents = await file.getAsBuffer(chunk.id)
let products = JSON.parse(contents.toString("utf-8"))
await batchProductsWorkflow(container).run({
input: products,
})
products = undefined
}
} finally {
/**
* Delete chunks regardless of the import status
*/
await file.deleteFiles(input.chunks.map((chunk) => chunk.id))
}
return new StepResponse({ completed: true })