03221c4a19
* init: copy PI files * feat: add subscribers, refactor strategies folder * wip: strategies integration tests package * fix: rename * wip: use redis * wip: use redis deps, redis setup in local tests * fix: naming collision, medusa config * fix: typing, update apply changes for new event ordering and reimplement interface * feat: make redis container run in integration tests * fix: missing yarn lock * feat: redis setup v2 * fix: setup server imports * fix: a lot of integration issues * fix: a lot of integration issues v2, transform tags, fix `ops` object parsing * wip: parsing product options * feat: ✨creating product and variants works, processing product/variant options, update schema * fix: query keys, logic for finding existing variant * fix: types * feat: update product variant's options * feat: parse MA records * feat: creating/updating MA records, region detection, error handling * feat: throw an error when creating an MA for nonexistent region * refactor: remove unused methods * refactor: use provided ids to track records, extract a couple of methods * refactor: remove unused method * refactor/wip: add initial comment for main methods * refactor: replace usage of RedisJSON functionality with basic k/v api * feat: async progress report * types: define more precise types, cleanup * feat: error handling * feat: unit testing preprocessing * feat: integration testing for CI, fix legacy bug where user is unable to create a variant if regional price is also sent as payload, add csv for integration tests * fix: error throw for logs * feat: add product endpoint snap * refactor: remove log * feat: add snaps, rebase * refactor: add comments * feat: snap update * refactor: typo * refactor: change error handler * feat: Redis cleanup after the job is done * testing :fix product unit test, remove integration snap, add inline object matcher * testing: fix obsolete snaps * refactor: update comments * fix: rebase issue * fix: rebase issue v2, remove log form an integration test * fix: try reverting setup server * fix: insert variants test * refactor: don't pass tx manager, refactor methods * refactor: don't use regionRepo, add `retrieveByName` to region repo * refactor: don't use productRepo * refactor: don't use `productVariantRepo` * refactor: remove repo mocks from unit tests * fix: product import unit tests * feat: file cleanup on finalize, kill test logs * wip: use files to persist ops instead of redis, move strategy class into `batch-job` folder * fix: minio delete method, add file cleanup method to import, fix promise coordination * fix: replace redis methods * feat: store import ops as a file instead of Redis * feat: test cleanup * fix: change unit tests after Redis logic removal * feat: use `results` for progress reporting, add `stat_descriptors` info after preprocessing, remove redis mentions * feat: extract to other files, use directory from property, fix strategy loader to allow other files in `strategies` directory * feat: fix instance progress counter * fix: mock services types * fix: update snaps * fix: error handling stream, fix test file service name generation * fix: remove dir with tmp files after testing * fix: new yarn.lock after rebase * fix: remove log, change object shape * fix: add DI types * refactor: remove container as a csv parser dep * fix: remove seeder, change typings * refactor: reimplement `retrieveByName` in the region service * fix: unit tests typings * fix: remove ts-ignore, complete typings for csv parser validators * fix: don't keep track of progress since it is redundant and only keep track of `advancement_count` * fix: return of the batch job seeder * fix: update find region by name method * fix: update types for service typings * fix: update redis type usage * fix: update unit tests file * fix: unit tests * fix: remove redis from integration test * feat: refactor region retrieval by name * feat: refactor product option update * fix: remove repo import * fix: return redis in test * fix: handle stream error * fix: tmp data cleanup Co-authored-by: fPolic <frane@medusajs.com>
199 lines
5.3 KiB
TypeScript
199 lines
5.3 KiB
TypeScript
import { difference } from "lodash"
|
|
import Papa, { ParseConfig } from "papaparse"
|
|
|
|
import { AbstractParser } from "../interfaces/abstract-parser"
|
|
import { CsvParserContext, CsvSchema } from "../interfaces/csv-parser"
|
|
|
|
const DEFAULT_PARSE_OPTIONS = {
|
|
dynamicTyping: true,
|
|
header: true,
|
|
}
|
|
|
|
class CsvParser<
|
|
TSchema extends CsvSchema<TParserResult, TOutputResult> = CsvSchema,
|
|
TParserResult = unknown,
|
|
TOutputResult = unknown
|
|
> extends AbstractParser<TSchema, TParserResult, ParseConfig, TOutputResult> {
|
|
protected readonly $$delimiter: string = ";"
|
|
|
|
constructor(schema: TSchema, delimiter?: string) {
|
|
super(schema)
|
|
if (delimiter) {
|
|
this.$$delimiter = delimiter
|
|
}
|
|
}
|
|
|
|
public async parse(
|
|
readableStream: NodeJS.ReadableStream,
|
|
options: ParseConfig = DEFAULT_PARSE_OPTIONS
|
|
): Promise<TParserResult[]> {
|
|
const csvStream = Papa.parse(Papa.NODE_STREAM_INPUT, options)
|
|
|
|
const parsedContent: TParserResult[] = []
|
|
readableStream.pipe(csvStream)
|
|
for await (const chunk of csvStream) {
|
|
parsedContent.push(chunk)
|
|
}
|
|
|
|
return parsedContent
|
|
}
|
|
|
|
async buildData(data: TParserResult[]): Promise<TOutputResult[]> {
|
|
const validatedData = [] as TOutputResult[]
|
|
for (let i = 0; i < data.length; i++) {
|
|
const builtLine = await this._buildLine(data[i], i + 1)
|
|
validatedData.push(builtLine)
|
|
}
|
|
return validatedData
|
|
}
|
|
|
|
private async _buildLine(
|
|
line: TParserResult,
|
|
lineNumber: number
|
|
): Promise<TOutputResult> {
|
|
let outputTuple = {} as TOutputResult
|
|
const columnMap = this.buildColumnMap_(this.$$schema.columns)
|
|
|
|
const tupleKeys = Object.keys(line)
|
|
|
|
/**
|
|
* map which keeps track of the columns processed
|
|
* used to detect any missing columns which are present in the schema but not in the line
|
|
*/
|
|
const processedColumns = {}
|
|
for (const tupleKey of tupleKeys) {
|
|
const column = this.resolveColumn_(tupleKey, columnMap)
|
|
|
|
/**
|
|
* if the tupleKey does not correspond to any column defined in the schema
|
|
*/
|
|
if (!column) {
|
|
throw new Error(
|
|
`Unable to treat column ${tupleKey} from the csv file. No target column found in the provided schema`
|
|
)
|
|
}
|
|
|
|
processedColumns[column.name] = true
|
|
|
|
/**
|
|
* if the value corresponding to the tupleKey is empty and the column is required in the schema
|
|
*/
|
|
if (!line[tupleKey] && column.required) {
|
|
throw new Error(
|
|
`No value found for target column "${column.name}" in line ${lineNumber} of the given csv file`
|
|
)
|
|
}
|
|
|
|
const context = {
|
|
line,
|
|
lineNumber,
|
|
column: column.name,
|
|
tupleKey,
|
|
}
|
|
|
|
outputTuple = this.resolveTuple_(outputTuple, column, context)
|
|
}
|
|
|
|
/**
|
|
* missing columns = columns defined in the schema - columns present in the line
|
|
*/
|
|
const missingColumns = difference(
|
|
Object.keys(columnMap),
|
|
Object.keys(processedColumns)
|
|
)
|
|
|
|
if (missingColumns.length > 0) {
|
|
throw new Error(
|
|
`Missing column(s) ${formatMissingColumns(
|
|
missingColumns
|
|
)} from the given csv file`
|
|
)
|
|
}
|
|
|
|
/**
|
|
* Runs the validation defined in the schema columns
|
|
*/
|
|
for (const column of this.$$schema.columns) {
|
|
const context = {
|
|
line,
|
|
lineNumber,
|
|
column: column.name,
|
|
}
|
|
|
|
if (column.validator) {
|
|
await column.validator.validate(outputTuple, context)
|
|
}
|
|
}
|
|
|
|
return outputTuple
|
|
}
|
|
|
|
private buildColumnMap_(
|
|
columns: TSchema["columns"]
|
|
): Record<string, TSchema["columns"][number]> {
|
|
return columns.reduce((map, column) => {
|
|
if (typeof column.name === "string") {
|
|
map[column.name] = column
|
|
}
|
|
return map
|
|
}, {})
|
|
}
|
|
|
|
private resolveColumn_(
|
|
tupleKey: string,
|
|
columnMap: Record<string, TSchema["columns"][number]>
|
|
): TSchema["columns"][number] | undefined {
|
|
if (columnMap[tupleKey]) {
|
|
return columnMap[tupleKey]
|
|
}
|
|
|
|
const matchedColumn = this.$$schema.columns.find((column) =>
|
|
"match" in column &&
|
|
typeof column.match === "object" &&
|
|
column.match instanceof RegExp
|
|
? column.match.test(tupleKey)
|
|
: false
|
|
)
|
|
|
|
return matchedColumn
|
|
}
|
|
|
|
private resolveTuple_(
|
|
tuple: TOutputResult,
|
|
column: TSchema["columns"][number],
|
|
context: CsvParserContext<TParserResult> & { tupleKey: string }
|
|
): TOutputResult {
|
|
const outputTuple = { ...tuple }
|
|
const { tupleKey, ...csvContext } = context
|
|
const { line } = csvContext
|
|
|
|
let resolvedKey = tupleKey
|
|
/**
|
|
* if match is provided, then we should call the reducer if it's defined
|
|
* otherwise, before using the mapTo property, we should make sure match was not provided
|
|
*/
|
|
if ("match" in column && column.reducer) {
|
|
return column.reducer(outputTuple, tupleKey, line[tupleKey], csvContext)
|
|
} else if (!("match" in column) && "mapTo" in column && column.mapTo) {
|
|
resolvedKey = column.mapTo
|
|
}
|
|
|
|
const resolvedValue = column.transform
|
|
? column.transform(line[tupleKey], csvContext)
|
|
: line[tupleKey]
|
|
|
|
outputTuple[resolvedKey] = resolvedValue
|
|
|
|
return outputTuple
|
|
}
|
|
}
|
|
|
|
const formatMissingColumns = (list: string[]): string =>
|
|
list.reduce(
|
|
(text, curr, i, array) =>
|
|
text + (i < array.length - 1 ? `"${curr}", ` : `"${curr}"`),
|
|
""
|
|
)
|
|
|
|
export default CsvParser
|