feat: wire up direct uploads with local file provider (#12643)

This commit is contained in:
Harminder Virk
2025-06-10 11:37:54 +02:00
committed by GitHub
parent 1a78476608
commit f2cb528a56
8 changed files with 112 additions and 57 deletions

View File

@@ -14,6 +14,8 @@ export type NormalizeProductCsvV1StepInput = string
export const normalizeCsvToChunksStepId = "normalize-product-csv-to-chunks"
type Chunk = { id: string; toCreate: number; toUpdate: number }
/**
* Processes a chunk of products by writing them to a file. Later the
* file will be processed after the import has been confirmed.
@@ -23,7 +25,7 @@ async function processChunk(
fileKey: string,
csvRows: ReturnType<(typeof CSVNormalizer)["preProcess"]>[],
currentRowNumber: number
) {
): Promise<Chunk> {
const normalizer = new CSVNormalizer(csvRows)
const products = normalizer.proccess(currentRowNumber)
@@ -76,7 +78,7 @@ async function createChunks(
file: IFileModuleService,
fileKey: string,
stream: Parser
) {
): Promise<Chunk[]> {
/**
* The row under process
*/
@@ -97,7 +99,7 @@ async function createChunks(
* Validated chunks that have been written with the file
* provider
*/
const chunks: { id: string; toCreate: number; toUpdate: number }[] = []
const chunks: Chunk[] = []
/**
* Currently collected rows to be processed as one chunk
@@ -192,36 +194,51 @@ async function createChunks(
export const normalizeCsvToChunksStep = createStep(
normalizeCsvToChunksStepId,
async (fileKey: NormalizeProductCsvV1StepInput, { container }) => {
const file = container.resolve(Modules.FILE)
const contents = await file.getDownloadStream(fileKey)
const chunks = await createChunks(
file,
fileKey,
contents.pipe(
parse({
return new Promise<
StepResponse<{
chunks: Chunk[]
summary: Omit<Chunk, "id">
}>
>(async (resolve, reject) => {
try {
const file = container.resolve(Modules.FILE)
const contents = await file.getDownloadStream(fileKey)
const transformer = parse({
columns: true,
skip_empty_lines: true,
})
)
)
const summary = chunks.reduce<{ toCreate: number; toUpdate: number }>(
(result, chunk) => {
result.toCreate = result.toCreate + chunk.toCreate
result.toUpdate = result.toUpdate + chunk.toUpdate
return result
},
{ toCreate: 0, toUpdate: 0 }
)
contents.on("error", reject)
/**
* Delete CSV file once we have the chunks
*/
await file.deleteFiles(fileKey)
const chunks = await createChunks(
file,
fileKey,
contents.pipe(transformer)
)
return new StepResponse({
chunks,
summary,
const summary = chunks.reduce<{ toCreate: number; toUpdate: number }>(
(result, chunk) => {
result.toCreate = result.toCreate + chunk.toCreate
result.toUpdate = result.toUpdate + chunk.toUpdate
return result
},
{ toCreate: 0, toUpdate: 0 }
)
/**
* Delete CSV file once we have the chunks
*/
await file.deleteFiles(fileKey)
resolve(
new StepResponse({
chunks,
summary,
})
)
} catch (error) {
reject(error)
}
})
}
)

View File

@@ -12,7 +12,10 @@ export const processImportChunksStepId = "process-import-chunks"
* const data = parseProductCsvStep("products.csv")
*/
export const processImportChunksStep = createStep(
processImportChunksStepId,
{
name: processImportChunksStepId,
async: true,
},
async (input: { chunks: { id: string }[] }, { container }) => {
const file = container.resolve(Modules.FILE)