From cf0297f74af1b043363ffbd5a23ca0933097a69d Mon Sep 17 00:00:00 2001 From: Harminder Virk Date: Thu, 29 May 2025 11:12:16 +0530 Subject: [PATCH] feat: implement stream based processing of the files (#12574) Fixes: FRMW-2960 This PR adds support for processing large CSV files by breaking them into chunks and processing one chunk at a time. This is how it works in nutshell. - The CSV file is read as a stream and each chunk of the stream is one CSV row. - We read upto 1000 rows (plus a few more to ensure product variants of a product are not split into multiple chunks). - Each chunk is then normalized using the `CSVNormalizer` and validated using zod schemas. If there is an error, the entire process will be aborted and the existing chunks will be deleted. - Each chunk is written to a JSON file, so that we can process them later (after user confirms) without re-processing or validating the CSV file. - The confirmation process will start consuming one chunk at a time and create/update products using the `batchProducts` workflow. ## Resume or not to resume processing of chunks Let's imagine during processing of chunks, we find that chunk 3 leads to a database error. However, till this time we have processed the first two chunks already. How do we deal with this situation? Options are: - We store at which chunk we failed and then during the re-upload we ignore chunks before the failed one. In my conversation with @olivermrbl we discovered that resuming will have to work with certain assumptions if we decide to implement it. - What if a user updates the CSV rows which are part of the already processed chunks? These changes will be ignored and they will never notice it. - Resuming works if the file name is still the same. What if they made changes and saved the file with "Save as - New name". In that case we will anyways process the entire file. - We will have to fetch the old workflow from the workflow engine using some `ilike` search, so that we can see at which chunk the last run failed for the given file. Co-authored-by: Carlos R. L. Rodrigues <37986729+carlos-r-l-rodrigues@users.noreply.github.com> --- .changeset/selfish-ways-provide.md | 7 + packages/core/core-flows/package.json | 1 + .../steps/normalize-products-to-chunks.ts | 234 +++++++++++++++--- .../src/product/steps/normalize-products.ts | 8 +- .../product/steps/process-import-chunks.ts | 21 +- packages/core/framework/package.json | 3 +- .../src/http/middlewares/error-handler.ts | 10 + packages/core/utils/src/common/index.ts | 1 + .../utils/src/common/normalize-csv-value.ts | 10 + .../product/__tests__/csv-normalizer.spec.ts | 32 ++- .../core/utils/src/product/csv-normalizer.ts | 156 ++++++------ yarn.lock | 18 ++ 12 files changed, 360 insertions(+), 141 deletions(-) create mode 100644 .changeset/selfish-ways-provide.md create mode 100644 packages/core/utils/src/common/normalize-csv-value.ts diff --git a/.changeset/selfish-ways-provide.md b/.changeset/selfish-ways-provide.md new file mode 100644 index 0000000000..9c221af55a --- /dev/null +++ b/.changeset/selfish-ways-provide.md @@ -0,0 +1,7 @@ +--- +"@medusajs/core-flows": patch +"@medusajs/framework": patch +"@medusajs/utils": patch +--- + +feat: implement stream based processing of the files diff --git a/packages/core/core-flows/package.json b/packages/core/core-flows/package.json index 8769bbadd6..4115378564 100644 --- a/packages/core/core-flows/package.json +++ b/packages/core/core-flows/package.json @@ -41,6 +41,7 @@ "typescript": "^5.6.2" }, "dependencies": { + "csv-parse": "^5.6.0", "json-2-csv": "^5.5.4" }, "peerDependencies": { diff --git a/packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts b/packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts index 749996405f..e3efaeb17a 100644 --- a/packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts +++ b/packages/core/core-flows/src/product/steps/normalize-products-to-chunks.ts @@ -1,11 +1,11 @@ -import { HttpTypes } from "@medusajs/framework/types" +import { parse, Parser } from "csv-parse" +import { HttpTypes, IFileModuleService } from "@medusajs/framework/types" import { - CSVNormalizer, Modules, + CSVNormalizer, productValidators, } from "@medusajs/framework/utils" import { StepResponse, createStep } from "@medusajs/framework/workflows-sdk" -import { convertCsvToJson } from "../utils" /** * The CSV file content to parse. @@ -14,6 +14,174 @@ export type NormalizeProductCsvV1StepInput = string export const normalizeCsvToChunksStepId = "normalize-product-csv-to-chunks" +/** + * Processes a chunk of products by writing them to a file. Later the + * file will be processed after the import has been confirmed. + */ +async function processChunk( + file: IFileModuleService, + fileKey: string, + csvRows: ReturnType<(typeof CSVNormalizer)["preProcess"]>[], + currentRowNumber: number +) { + const normalizer = new CSVNormalizer(csvRows) + const products = normalizer.proccess(currentRowNumber) + + let create = Object.keys(products.toCreate).reduce< + HttpTypes.AdminCreateProduct[] + >((result, toCreateHandle) => { + result.push( + productValidators.CreateProduct.parse( + products.toCreate[toCreateHandle] + ) as HttpTypes.AdminCreateProduct + ) + return result + }, []) + + let update = Object.keys(products.toUpdate).reduce< + HttpTypes.AdminUpdateProduct & { id: string }[] + >((result, toUpdateId) => { + result.push( + productValidators.UpdateProduct.parse(products.toUpdate[toUpdateId]) + ) + return result + }, []) + + const toCreate = create.length + const toUpdate = update.length + + const { id } = await file.createFiles({ + filename: `${fileKey}.json`, + content: JSON.stringify({ create, update }), + mimeType: "application/json", + }) + + /** + * Release products from the memory + */ + create = [] + update = [] + + return { + id, + toCreate, + toUpdate, + } +} + +/** + * Creates chunks by reading CSV rows from the stream + */ +async function createChunks( + file: IFileModuleService, + fileKey: string, + stream: Parser +) { + /** + * The row under process + */ + let currentCSVRow = 0 + + /** + * Number of rows to process in a chunk. The rows count might go a little + * up if there are more rows for the same product. + */ + const rowsToRead = 1000 + + /** + * Current count of processed rows for a given chunk. + */ + let rowsReadSoFar = 0 + + /** + * Validated chunks that have been written with the file + * provider + */ + const chunks: { id: string; toCreate: number; toUpdate: number }[] = [] + + /** + * Currently collected rows to be processed as one chunk + */ + let rows: ReturnType<(typeof CSVNormalizer)["preProcess"]>[] = [] + + /** + * The unique value for the current row. We need this value to scan + * more rows after rowsToRead threshold has reached, but the upcoming + * rows are part of the same product. + */ + let currentRowUniqueValue: string | undefined + + try { + for await (const row of stream) { + rowsReadSoFar++ + currentCSVRow++ + const normalizedRow = CSVNormalizer.preProcess(row, currentCSVRow) + const rowValueValue = + normalizedRow["product id"] || normalizedRow["product handle"] + + /** + * Reached rows threshold + */ + if (rowsReadSoFar > rowsToRead) { + /** + * The current row unique value is not same as the previous row's + * unique value. Hence we can break the chunk here and process + * it. + */ + if (rowValueValue !== currentRowUniqueValue) { + chunks.push( + await processChunk( + file, + `${fileKey}-${chunks.length + 1}`, + rows, + currentCSVRow + ) + ) + + /** + * Reset for new row + */ + rows = [normalizedRow] + rowsReadSoFar = 0 + } else { + rows.push(normalizedRow) + } + } else { + rows.push(normalizedRow) + } + + currentRowUniqueValue = rowValueValue + } + + /** + * The file has finished and we have collected some rows that were + * under the chunk rows size threshold. + */ + if (rows.length) { + chunks.push( + await processChunk( + file, + `${fileKey}-${chunks.length + 1}`, + rows, + currentCSVRow + ) + ) + } + } catch (error) { + if (!stream.destroyed) { + stream.destroy() + } + + /** + * Cleanup in case of an error + */ + await file.deleteFiles(chunks.map((chunk) => chunk.id).concat(fileKey)) + throw error + } + + return chunks +} + /** * This step parses a CSV file holding products to import, returning the chunks * to be processed. Each chunk is written to a file using the file provider. @@ -25,47 +193,35 @@ export const normalizeCsvToChunksStep = createStep( normalizeCsvToChunksStepId, async (fileKey: NormalizeProductCsvV1StepInput, { container }) => { const file = container.resolve(Modules.FILE) - const contents = await file.getAsBuffer(fileKey) - - const csvProducts = convertCsvToJson< - ConstructorParameters[0][0] - >(contents.toString("utf-8")) - - const normalizer = new CSVNormalizer(csvProducts) - const products = normalizer.proccess() - - const create = Object.keys(products.toCreate).reduce< - HttpTypes.AdminCreateProduct[] - >((result, toCreateHandle) => { - result.push( - productValidators.CreateProduct.parse( - products.toCreate[toCreateHandle] - ) as HttpTypes.AdminCreateProduct + const contents = await file.getDownloadStream(fileKey) + const chunks = await createChunks( + file, + fileKey, + contents.pipe( + parse({ + columns: true, + skip_empty_lines: true, + }) ) - return result - }, []) + ) - const update = Object.keys(products.toUpdate).reduce< - HttpTypes.AdminUpdateProduct & { id: string }[] - >((result, toUpdateId) => { - result.push( - productValidators.UpdateProduct.parse(products.toUpdate[toUpdateId]) - ) - return result - }, []) + const summary = chunks.reduce<{ toCreate: number; toUpdate: number }>( + (result, chunk) => { + result.toCreate = result.toCreate + chunk.toCreate + result.toUpdate = result.toUpdate + chunk.toUpdate + return result + }, + { toCreate: 0, toUpdate: 0 } + ) - const { id } = await file.createFiles({ - filename: `${fileKey}.json`, - content: JSON.stringify({ create, update }), - mimeType: "application/json", - }) + /** + * Delete CSV file once we have the chunks + */ + await file.deleteFiles(fileKey) return new StepResponse({ - chunks: [id], - summary: { - toCreate: create.length, - toUpdate: update.length, - }, + chunks, + summary, }) } ) diff --git a/packages/core/core-flows/src/product/steps/normalize-products.ts b/packages/core/core-flows/src/product/steps/normalize-products.ts index ee4c85d234..b8cc080d76 100644 --- a/packages/core/core-flows/src/product/steps/normalize-products.ts +++ b/packages/core/core-flows/src/product/steps/normalize-products.ts @@ -20,10 +20,10 @@ export const normalizeCsvStep = createStep( normalizeCsvStepId, async (fileContent: NormalizeProductCsvStepInput) => { const csvProducts = - convertCsvToJson[0][0]>( - fileContent - ) - const normalizer = new CSVNormalizer(csvProducts) + convertCsvToJson>(fileContent) + const normalizer = new CSVNormalizer( + csvProducts.map((row, index) => CSVNormalizer.preProcess(row, index + 1)) + ) const products = normalizer.proccess() const create = Object.keys(products.toCreate).reduce< diff --git a/packages/core/core-flows/src/product/steps/process-import-chunks.ts b/packages/core/core-flows/src/product/steps/process-import-chunks.ts index be3907c999..c256a84f6a 100644 --- a/packages/core/core-flows/src/product/steps/process-import-chunks.ts +++ b/packages/core/core-flows/src/product/steps/process-import-chunks.ts @@ -13,14 +13,23 @@ export const processImportChunksStepId = "process-import-chunks" */ export const processImportChunksStep = createStep( processImportChunksStepId, - async (input: { chunks: string[] }, { container }) => { + async (input: { chunks: { id: string }[] }, { container }) => { const file = container.resolve(Modules.FILE) - for (let chunk of input.chunks) { - const contents = await file.getAsBuffer(chunk) - await batchProductsWorkflow(container).run({ - input: JSON.parse(contents.toString("utf-8")), - }) + try { + for (let chunk of input.chunks) { + const contents = await file.getAsBuffer(chunk.id) + let products = JSON.parse(contents.toString("utf-8")) + await batchProductsWorkflow(container).run({ + input: products, + }) + products = undefined + } + } finally { + /** + * Delete chunks regardless of the import status + */ + await file.deleteFiles(input.chunks.map((chunk) => chunk.id)) } return new StepResponse({ completed: true }) diff --git a/packages/core/framework/package.json b/packages/core/framework/package.json index 6658d1db55..9dc8843055 100644 --- a/packages/core/framework/package.json +++ b/packages/core/framework/package.json @@ -97,7 +97,8 @@ "morgan": "^1.9.1", "path-to-regexp": "^0.1.10", "tsconfig-paths": "^4.2.0", - "zod": "3.22.4" + "zod": "3.22.4", + "zod-validation-error": "^3.4.1" }, "peerDependencies": { "@aws-sdk/client-dynamodb": "^3.218.0", diff --git a/packages/core/framework/src/http/middlewares/error-handler.ts b/packages/core/framework/src/http/middlewares/error-handler.ts index 24cb12656f..478dad206c 100644 --- a/packages/core/framework/src/http/middlewares/error-handler.ts +++ b/packages/core/framework/src/http/middlewares/error-handler.ts @@ -1,3 +1,4 @@ +import { fromZodIssue } from "zod-validation-error" import { NextFunction, ErrorRequestHandler, Response } from "express" import { ContainerRegistrationKeys, MedusaError } from "@medusajs/utils" @@ -75,6 +76,15 @@ export function errorHandler() { break } + if ("issues" in err && Array.isArray(err.issues)) { + const messages = err.issues.map((issue) => fromZodIssue(issue).toString()) + res.status(statusCode).json({ + type: MedusaError.Types.INVALID_DATA, + message: messages.join("\n"), + }) + return + } + res.status(statusCode).json(errObj) } as unknown as ErrorRequestHandler } diff --git a/packages/core/utils/src/common/index.ts b/packages/core/utils/src/common/index.ts index b3fb55eba4..4dc8e03db6 100644 --- a/packages/core/utils/src/common/index.ts +++ b/packages/core/utils/src/common/index.ts @@ -88,3 +88,4 @@ export * from "./upper-case-first" export * from "./validate-handle" export * from "./validate-module-name" export * from "./wrap-handler" +export * from "./normalize-csv-value" diff --git a/packages/core/utils/src/common/normalize-csv-value.ts b/packages/core/utils/src/common/normalize-csv-value.ts new file mode 100644 index 0000000000..37a2920ad8 --- /dev/null +++ b/packages/core/utils/src/common/normalize-csv-value.ts @@ -0,0 +1,10 @@ +/** + * Normalizes a CSV value by removing the leading "\r" from the + * value. + */ +export function normalizeCSVValue(value: T): T { + if (typeof value === "string") { + return value.replace(/\\r$/, "").trim() as T + } + return value +} diff --git a/packages/core/utils/src/product/__tests__/csv-normalizer.spec.ts b/packages/core/utils/src/product/__tests__/csv-normalizer.spec.ts index 7bee7957bd..0fbfdb33f8 100644 --- a/packages/core/utils/src/product/__tests__/csv-normalizer.spec.ts +++ b/packages/core/utils/src/product/__tests__/csv-normalizer.spec.ts @@ -10,16 +10,16 @@ async function loadFixtureFile(fileName: string) { describe("CSV processor", () => { it("should error when both Product Id and Handle are missing", async () => { - const processor = new CSVNormalizer([{}]) - - expect(() => processor.proccess()).toThrow( - "Row 1: Missing product id and handle. One of them are required to process the row" + expect(() => CSVNormalizer.preProcess({}, 1)).toThrow( + "Row 1: Missing product id and handle. One of these columns are required to process the row" ) }) it("should process a CSV row", async () => { - const csvData = await loadFixtureFile("single-row-create.json") - const processor = new CSVNormalizer(csvData) + const csvData: any[] = await loadFixtureFile("single-row-create.json") + const processor = new CSVNormalizer( + csvData.map((row, index) => CSVNormalizer.preProcess(row, index + 1)) + ) const products = processor.proccess() expect(products).toMatchInlineSnapshot(` @@ -87,8 +87,12 @@ describe("CSV processor", () => { }) it("should process multiple CSV rows for the same product", async () => { - const csvData = await loadFixtureFile("same-product-multiple-rows.json") - const processor = new CSVNormalizer(csvData) + const csvData: any[] = await loadFixtureFile( + "same-product-multiple-rows.json" + ) + const processor = new CSVNormalizer( + csvData.map((row, index) => CSVNormalizer.preProcess(row, index + 1)) + ) const products = processor.proccess() expect(products).toMatchInlineSnapshot(` @@ -200,10 +204,12 @@ describe("CSV processor", () => { }) it("should process multiple CSV rows where each variant uses different options", async () => { - const csvData = await loadFixtureFile( + const csvData: any[] = await loadFixtureFile( "same-product-multiple-variant-options.json" ) - const processor = new CSVNormalizer(csvData) + const processor = new CSVNormalizer( + csvData.map((row, index) => CSVNormalizer.preProcess(row, index + 1)) + ) const products = processor.proccess() expect(products).toMatchInlineSnapshot(` @@ -325,10 +331,12 @@ describe("CSV processor", () => { }) it("should process multiple CSV rows with multiple products and variants", async () => { - const csvData = await loadFixtureFile( + const csvData: any[] = await loadFixtureFile( "multiple-products-multiple-variants.json" ) - const processor = new CSVNormalizer(csvData) + const processor = new CSVNormalizer( + csvData.map((row, index) => CSVNormalizer.preProcess(row, index + 1)) + ) const products = processor.proccess() expect(products).toMatchInlineSnapshot(` diff --git a/packages/core/utils/src/product/csv-normalizer.ts b/packages/core/utils/src/product/csv-normalizer.ts index 97bfc2a9e8..f723acd309 100644 --- a/packages/core/utils/src/product/csv-normalizer.ts +++ b/packages/core/utils/src/product/csv-normalizer.ts @@ -3,6 +3,7 @@ import { tryConvertToNumber, tryConvertToBoolean, MedusaError, + normalizeCSVValue, } from "../common" import { AdminCreateProduct, AdminCreateProductVariant } from "@medusajs/types" @@ -17,6 +18,20 @@ type ColumnProcessor = ( output: Output ) => void +type NormalizedRow = + | (Record & { + "product id": string + "product handle": string + }) + | { + "product id"?: string + "product handle": string + } + | { + "product id": string + "product handle"?: string + } + /** * Creates an error with the CSV row number */ @@ -27,23 +42,12 @@ function createError(rowNumber: number, message: string) { ) } -/** - * Normalizes a CSV value by removing the leading "\r" from the - * value. - */ -function normalizeValue(value: T): T { - if (typeof value === "string") { - return value.replace(/\\r$/, "").trim() as T - } - return value -} - /** * Parses different patterns to extract variant price iso * and the region name. The iso is converted to lowercase */ function parseVariantPriceColumn(columnName: string, rowNumber: number) { - const normalizedValue = normalizeValue(columnName) + const normalizedValue = columnName const potentialRegion = /\[(.*)\]/g.exec(normalizedValue)?.[1] const iso = normalizedValue.split(" ").pop() @@ -68,7 +72,7 @@ function processAsString( outputKey: keyof Output ): ColumnProcessor { return (csvRow, _, __, output) => { - const value = normalizeValue(csvRow[inputKey]) + const value = csvRow[inputKey] if (isPresent(value)) { output[outputKey as any] = value } @@ -83,7 +87,7 @@ function processAsBoolean( outputKey: keyof Output ): ColumnProcessor { return (csvRow, _, __, output) => { - const value = normalizeValue(csvRow[inputKey]) + const value = csvRow[inputKey] if (isPresent(value)) { output[outputKey as any] = tryConvertToBoolean(value, value) } @@ -99,7 +103,7 @@ function processAsNumber( options?: { asNumericString: boolean } ): ColumnProcessor { return (csvRow, _, rowNumber, output) => { - const value = normalizeValue(csvRow[inputKey]) + const value = csvRow[inputKey] if (isPresent(value)) { const numericValue = tryConvertToNumber(value) if (numericValue === undefined) { @@ -135,7 +139,7 @@ function processAsCounterValue>( rowColumns .filter((rowKey) => inputMatcher.test(rowKey)) .forEach((rowKey) => { - const value = normalizeValue(csvRow[rowKey]) + const value = csvRow[rowKey] if (!existingIds.includes(value) && isPresent(value)) { output[outputKey].push({ [arrayItemKey]: value }) } @@ -243,7 +247,7 @@ const variantStaticColumns: { "variant origin country", "origin_country" ), - "variant variant rank": processAsString( + "variant variant rank": processAsNumber( "variant variant rank", "variant_rank" ), @@ -268,7 +272,7 @@ const variantWildcardColumns: { pricesColumns.forEach((columnName) => { const { iso } = parseVariantPriceColumn(columnName, rowNumber) - const value = normalizeValue(csvRow[columnName]) + const value = csvRow[columnName] const numericValue = tryConvertToNumber(value) if (numericValue === undefined) { @@ -298,13 +302,13 @@ const optionColumns: { "variant option": (csvRow, rowColumns, rowNumber, output) => { const matcher = /variant option \d+ name/ const optionNameColumns = rowColumns.filter((rowKey) => { - return matcher.test(rowKey) && isPresent(normalizeValue(csvRow[rowKey])) + return matcher.test(rowKey) && isPresent(csvRow[rowKey]) }) output["options"] = optionNameColumns.map((columnName) => { const [, , counter] = columnName.split(" ") - const key = normalizeValue(csvRow[columnName]) - const value = normalizeValue(csvRow[`variant option ${counter} value`]) + const key = csvRow[columnName] + const value = csvRow[`variant option ${counter} value`] if (!isPresent(value)) { throw createError(rowNumber, `Missing option value for "${columnName}"`) @@ -336,6 +340,52 @@ const knownWildcardColumns = Object.keys(productWildcardColumns) * the required fields in the normalized output. */ export class CSVNormalizer { + /** + * Normalizes a row by converting all keys to lowercase and removing + * the leading "\r" from the keys and the values. + * + * Also, it values the row to contain unknown columns and must contain + * the "product id" or "product handle" columns. + */ + static preProcess( + row: Record, + rowNumber: number + ): NormalizedRow { + const unknownColumns: string[] = [] + + const normalized = Object.keys(row).reduce((result, key) => { + const lowerCaseKey = normalizeCSVValue(key).toLowerCase() + + if ( + !knownStaticColumns.includes(lowerCaseKey) && + !knownWildcardColumns.some((column) => lowerCaseKey.startsWith(column)) + ) { + unknownColumns.push(key) + } + + result[lowerCaseKey] = normalizeCSVValue(row[key]) + return result + }, {}) + + if (unknownColumns.length) { + throw new MedusaError( + MedusaError.Types.INVALID_DATA, + `Invalid column name(s) "${unknownColumns.join('","')}"` + ) + } + + const productId = normalized["product id"] + const productHandle = normalized["product handle"] + if (!isPresent(productId) && !isPresent(productHandle)) { + throw createError( + rowNumber, + "Missing product id and handle. One of these columns are required to process the row" + ) + } + + return normalized as NormalizedRow + } + #rows: Record[] #products: { @@ -354,30 +404,10 @@ export class CSVNormalizer { toUpdate: {}, } - constructor(rows: Record[]) { + constructor(rows: NormalizedRow[]) { this.#rows = rows } - /** - * Ensures atleast one of the product id or the handle is provided. Otherwise - * we cannot process the row - */ - #ensureRowHasProductIdentifier( - row: Record, - rowNumber: number - ) { - const productId = row["product id"] - const productHandle = row["product handle"] - if (!isPresent(productId) && !isPresent(productHandle)) { - throw createError( - rowNumber, - "Missing product id and handle. One of them are required to process the row" - ) - } - - return { productId, productHandle } - } - /** * Initializes a product object or returns an existing one * by its id. The products with ids are treated as updates @@ -400,37 +430,6 @@ export class CSVNormalizer { return this.#products.toCreate[handle]! } - /** - * Normalizes a row by converting all keys to lowercase and creating a - * new object - */ - #normalizeRow(row: Record) { - const unknownColumns: string[] = [] - - const normalized = Object.keys(row).reduce((result, key) => { - const lowerCaseKey = key.toLowerCase() - result[lowerCaseKey] = row[key] - - if ( - !knownStaticColumns.includes(lowerCaseKey) && - !knownWildcardColumns.some((column) => lowerCaseKey.startsWith(column)) - ) { - unknownColumns.push(key) - } - - return result - }, {}) - - if (unknownColumns.length) { - throw new MedusaError( - MedusaError.Types.INVALID_DATA, - `Invalid column name(s) "${unknownColumns.join('","')}"` - ) - } - - return normalized - } - /** * Processes a given CSV row */ @@ -439,10 +438,8 @@ export class CSVNormalizer { rowNumber: number ) { const rowColumns = Object.keys(row) - const { productHandle, productId } = this.#ensureRowHasProductIdentifier( - row, - rowNumber - ) + const productId = row["product id"] + const productHandle = row["product handle"] /** * Create representation of a product by its id or handle and process @@ -508,10 +505,11 @@ export class CSVNormalizer { /** * Process CSV rows. The return value is a tree of products */ - proccess() { + proccess(resumingFromIndex: number = 0) { this.#rows.forEach((row, index) => - this.#processRow(this.#normalizeRow(row), index + 1) + this.#processRow(row, resumingFromIndex + index + 1) ) + this.#rows = [] return this.#products } } diff --git a/yarn.lock b/yarn.lock index b13a3c50d6..8159f00143 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6086,6 +6086,7 @@ __metadata: "@swc/core": ^1.7.28 "@swc/jest": ^0.2.36 awilix: ^8.0.1 + csv-parse: ^5.6.0 expect-type: ^0.20.0 jest: ^29.7.0 json-2-csv: ^5.5.4 @@ -6343,6 +6344,7 @@ __metadata: typescript: ^5.6.2 vite: ^5.4.14 zod: 3.22.4 + zod-validation-error: ^3.4.1 peerDependencies: "@aws-sdk/client-dynamodb": ^3.218.0 "@medusajs/cli": 2.8.3 @@ -19320,6 +19322,13 @@ __metadata: languageName: node linkType: hard +"csv-parse@npm:^5.6.0": + version: 5.6.0 + resolution: "csv-parse@npm:5.6.0" + checksum: 52f5e6c45359902e0c8e57fc2eeed41366dc6b6d283b495b538dd50c8e8510413d6f924096ea056319cbbb8ed26e111c3a3485d7985c021bcf5abaa9e92425c7 + languageName: node + linkType: hard + "csv-stringify@npm:^5.6.5": version: 5.6.5 resolution: "csv-stringify@npm:5.6.5" @@ -35567,6 +35576,15 @@ __metadata: languageName: node linkType: hard +"zod-validation-error@npm:^3.4.1": + version: 3.4.1 + resolution: "zod-validation-error@npm:3.4.1" + peerDependencies: + zod: ^3.24.4 + checksum: cf16f12fccb3e515d18c876c8a75ae4a87219b28e8e7f6334b8d423bebfa2c08b3382d7c53842ba05af8c5caabf66ee8df1ce2862b3b41c2e96eba26e70a995f + languageName: node + linkType: hard + "zod@npm:3.22.4": version: 3.22.4 resolution: "zod@npm:3.22.4"