feat(medusa-file-local): implement missing streaming methods (#4788)

This commit is contained in:
Frane Polić
2023-08-23 13:05:04 +02:00
committed by GitHub
parent 4843cc6301
commit d8a6e3e0d8
10 changed files with 189 additions and 107 deletions

View File

@@ -0,0 +1,9 @@
---
"@medusajs/medusa": minor
"@medusajs/file-local": patch
"medusa-file-minio": patch
"medusa-file-s3": patch
"@medusajs/types": patch
---
feat(medusa-file-local): local file service streaming methods

View File

@@ -1,11 +1,13 @@
import { AbstractFileService, IFileService } from "@medusajs/medusa"
import {
AbstractFileService,
FileServiceGetUploadStreamResult,
FileServiceUploadResult,
IFileService,
} from "@medusajs/medusa"
UploadStreamDescriptorType,
} from "@medusajs/types"
import fs from "fs"
import { parse } from "path"
import path from "path"
import stream from "stream"
class LocalService extends AbstractFileService implements IFileService {
protected uploadDir_: string
@@ -14,7 +16,7 @@ class LocalService extends AbstractFileService implements IFileService {
constructor({}, options) {
super({}, options)
this.uploadDir_ = options.upload_dir || "uploads/images"
this.uploadDir_ = options.upload_dir || "uploads"
this.backendUrl_ = options.backend_url || "http://localhost:9000"
}
@@ -29,40 +31,91 @@ class LocalService extends AbstractFileService implements IFileService {
async uploadFile(
file: Express.Multer.File,
options = {}
): Promise<{ url: string }> {
const parsedFilename = parse(file.originalname)
): Promise<FileServiceUploadResult> {
const parsedFilename = path.parse(file.originalname)
const fileKey = `${parsedFilename.name}-${Date.now()}${parsedFilename.ext}`
if (parsedFilename.dir) {
this.ensureDirExists(parsedFilename.dir)
}
const fileKey = path.join(
parsedFilename.dir,
`${Date.now()}-${parsedFilename.base}`
)
return new Promise((resolve, reject) => {
fs.copyFile(file.path, `${this.uploadDir_}/${fileKey}`, (err) => {
if (err) {
reject(err)
throw err
}
const fileUrl = `${this.backendUrl_}/${this.uploadDir_}/${fileKey}`
resolve({ url: fileUrl })
resolve({ url: fileUrl, key: fileKey })
})
})
}
async delete(file): Promise<void> {
throw Error("Not implemented")
const filePath = `${this.uploadDir_}/${file.fileKey}`
if (fs.existsSync(filePath)) {
fs.unlinkSync(filePath)
}
}
async getUploadStreamDescriptor(
fileData
fileData: UploadStreamDescriptorType
): Promise<FileServiceGetUploadStreamResult> {
throw Error("Not implemented")
const parsedFilename = path.parse(
fileData.name + (fileData.ext ? `.${fileData.ext}` : "")
)
if (parsedFilename.dir) {
this.ensureDirExists(parsedFilename.dir)
}
const fileKey = path.join(
parsedFilename.dir,
`${Date.now()}-${parsedFilename.base}`
)
const fileUrl = `${this.backendUrl_}/${this.uploadDir_}/${fileKey}`
const pass = new stream.PassThrough()
const writeStream = fs.createWriteStream(`${this.uploadDir_}/${fileKey}`)
pass.pipe(writeStream) // for consistency with the IFileService
const promise = new Promise((res, rej) => {
writeStream.on("finish", res)
writeStream.on("error", rej)
})
return { url: fileUrl, fileKey, writeStream: pass, promise }
}
async getDownloadStream(fileData): Promise<NodeJS.ReadableStream> {
throw Error("Not implemented")
const filePath = `${this.uploadDir_}/${fileData.fileKey}`
return fs.createReadStream(filePath)
}
async getPresignedDownloadUrl(fileData): Promise<string> {
throw Error("Not implemented")
return `${this.backendUrl_}/${this.uploadDir_}/${fileData.fileKey}`
}
/**
* Ensure `uploadDir_` has nested directories provided as file path
*
* @param dirPath - file path relative to the base directory
* @private
*/
private ensureDirExists(dirPath: string) {
const relativePath = path.join(this.uploadDir_, dirPath)
if (!fs.existsSync(relativePath)) {
fs.mkdirSync(relativePath, { recursive: true })
}
}
}

View File

@@ -1,11 +1,10 @@
import { AbstractFileService, IFileService } from "@medusajs/medusa"
import {
AbstractFileService,
DeleteFileType,
FileServiceUploadResult,
GetUploadedFileType,
IFileService,
UploadStreamDescriptorType,
} from "@medusajs/medusa"
} from "@medusajs/types"
import { ClientConfiguration, PutObjectRequest } from "aws-sdk/clients/s3"
import { MedusaError } from "medusa-core-utils"

View File

@@ -1,14 +1,13 @@
import fs from "fs"
import aws from "aws-sdk"
import { parse } from "path"
import { AbstractFileService, IFileService } from "@medusajs/medusa"
import {
AbstractFileService,
DeleteFileType,
FileServiceUploadResult,
GetUploadedFileType,
IFileService,
UploadStreamDescriptorType,
} from "@medusajs/medusa"
} from "@medusajs/types"
import stream from "stream"
import { PutObjectRequest } from "aws-sdk/clients/s3"
import { ClientConfiguration } from "aws-sdk/clients/s3"

View File

@@ -1,35 +1,11 @@
import stream from "stream"
import { TransactionBaseService } from "./transaction-base-service"
export type FileServiceUploadResult = {
url: string
}
export type FileServiceGetUploadStreamResult = {
writeStream: stream.PassThrough
promise: Promise<any>
url: string
fileKey: string
[x: string]: unknown
}
export type GetUploadedFileType = {
fileKey: string
isPrivate?: boolean
[x: string]: unknown
}
export type DeleteFileType = {
fileKey: string
[x: string]: unknown
}
export type UploadStreamDescriptorType = {
name: string
ext?: string
isPrivate?: boolean
[x: string]: unknown
}
import {
DeleteFileType,
FileServiceGetUploadStreamResult,
FileServiceUploadResult,
GetUploadedFileType,
UploadStreamDescriptorType,
} from "@medusajs/types"
export interface IFileService extends TransactionBaseService {
/**

View File

@@ -1,12 +1,12 @@
import { MedusaError } from "medusa-core-utils"
import { EntityManager } from "typeorm"
import { AbstractFileService } from "../interfaces"
import {
AbstractFileService,
FileServiceGetUploadStreamResult,
FileServiceUploadResult,
GetUploadedFileType,
UploadStreamDescriptorType,
} from "../interfaces"
} from "@medusajs/types"
class DefaultFileService extends AbstractFileService {
async upload(

View File

@@ -21,6 +21,9 @@ import {
TBuiltPriceListImportLine,
TParsedPriceListImportRowData,
} from "./types"
import { BatchJob } from "../../../models"
import { TParsedProductImportRowData } from "../product/types"
import { PriceListPriceCreateInput } from "../../../types/price-list"
/*
* Default strategy class used for a batch import of products/variants.
@@ -298,7 +301,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy {
// Upload new prices for price list
const priceImportOperations = await this.downloadImportOpsFile(
batchJobId,
batchJob,
OperationType.PricesCreate
)
@@ -306,7 +309,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy {
priceImportOperations.map(async (op) => {
await txPriceListService.addPrices(
priceListId,
op.prices.map((p) => {
(op.prices as PriceListPriceCreateInput[]).map((p) => {
return {
...p,
variant_id: op.variant_id,
@@ -328,14 +331,16 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy {
*/
protected async uploadImportOpsFile(
batchJobId: string,
results: Record<OperationType, PriceListImportOperation[]>
results: Record<OperationType, TParsedProductImportRowData[]>
): Promise<void> {
const uploadPromises: Promise<void>[] = []
const transactionManager = this.transactionManager_ ?? this.manager_
const files: Record<string, string> = {}
for (const op in results) {
if (results[op]?.length) {
const { writeStream, promise } = await this.fileService_
const { writeStream, fileKey, promise } = await this.fileService_
.withTransaction(transactionManager)
.getUploadStreamDescriptor({
name: PriceListImportStrategy.buildFilename(batchJobId, op),
@@ -344,33 +349,38 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy {
uploadPromises.push(promise)
files[op] = fileKey
writeStream.write(JSON.stringify(results[op]))
writeStream.end()
}
}
await this.batchJobService_
.withTransaction(transactionManager)
.update(batchJobId, {
result: { files },
})
await Promise.all(uploadPromises)
}
/**
* Remove parsed ops JSON file.
* Download parsed ops JSON file.
*
* @param batchJobId - An id of the current batch job being processed.
* @param batchJob - the current batch job being processed
* @param op - Type of import operation.
*/
protected async downloadImportOpsFile(
batchJobId: string,
batchJob: BatchJob,
op: OperationType
): Promise<PriceListImportOperation[]> {
): Promise<TParsedProductImportRowData[]> {
let data = ""
const transactionManager = this.transactionManager_ ?? this.manager_
const readableStream = await this.fileService_
.withTransaction(transactionManager)
.getDownloadStream({
fileKey: PriceListImportStrategy.buildFilename(batchJobId, op, {
appendExt: ".json",
}),
fileKey: batchJob.result.files![op],
})
return await new Promise((resolve) => {
@@ -382,7 +392,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy {
})
readableStream.on("error", () => {
// TODO: maybe should throw
resolve([] as PriceListImportOperation[])
resolve([] as TParsedProductImportRowData[])
})
})
}
@@ -390,18 +400,16 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy {
/**
* Delete parsed CSV ops files.
*
* @param batchJobId - An id of the current batch job being processed.
* @param batchJob - the current batch job being processed
*/
protected async deleteOpsFiles(batchJobId: string): Promise<void> {
protected async deleteOpsFiles(batchJob: BatchJob): Promise<void> {
const transactionManager = this.transactionManager_ ?? this.manager_
const fileServiceTx = this.fileService_.withTransaction(transactionManager)
for (const op of Object.values(OperationType)) {
for (const fileName of Object.values(batchJob.result.files!)) {
try {
await fileServiceTx.delete({
fileKey: PriceListImportStrategy.buildFilename(batchJobId, op, {
appendExt: ".json",
}),
fileKey: fileName,
})
} catch (e) {
// noop
@@ -432,7 +440,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy {
.withTransaction(transactionManager)
.delete({ fileKey })
await this.deleteOpsFiles(batchJob.id)
await this.deleteOpsFiles(batchJob)
}
private static buildFilename(

View File

@@ -8,30 +8,30 @@ import ProductCategoryFeatureFlag from "../../../loaders/feature-flags/product-c
import SalesChannelFeatureFlag from "../../../loaders/feature-flags/sales-channels"
import { BatchJob, SalesChannel } from "../../../models"
import {
BatchJobService,
ProductCategoryService,
ProductCollectionService,
ProductService,
ProductVariantService,
RegionService,
SalesChannelService,
ShippingProfileService,
BatchJobService,
ProductCategoryService,
ProductCollectionService,
ProductService,
ProductVariantService,
RegionService,
SalesChannelService,
ShippingProfileService,
} from "../../../services"
import CsvParser from "../../../services/csv-parser"
import { CreateProductInput } from "../../../types/product"
import { CreateProductVariantInput } from "../../../types/product-variant"
import {
OperationType,
ProductImportBatchJob,
ProductImportCsvSchema,
ProductImportInjectedProps,
ProductImportJobContext,
TParsedProductImportRowData,
OperationType,
ProductImportBatchJob,
ProductImportCsvSchema,
ProductImportInjectedProps,
ProductImportJobContext,
TParsedProductImportRowData,
} from "./types"
import {
productImportColumnsDefinition,
productImportProductCategoriesColumnsDefinition,
productImportSalesChannelsColumnsDefinition,
productImportColumnsDefinition,
productImportProductCategoriesColumnsDefinition,
productImportSalesChannelsColumnsDefinition,
} from "./types/columns-definition"
import { transformProductData, transformVariantData } from "./utils"
@@ -420,7 +420,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const transactionManager = this.transactionManager_ ?? this.manager_
const productOps = await this.downloadImportOpsFile(
batchJob.id,
batchJob,
OperationType.ProductCreate
)
@@ -492,7 +492,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const transactionManager = this.transactionManager_ ?? this.manager_
const productOps = await this.downloadImportOpsFile(
batchJob.id,
batchJob,
OperationType.ProductUpdate
)
@@ -568,7 +568,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const transactionManager = this.transactionManager_ ?? this.manager_
const variantOps = await this.downloadImportOpsFile(
batchJob.id,
batchJob,
OperationType.VariantCreate
)
@@ -624,7 +624,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const transactionManager = this.transactionManager_ ?? this.manager_
const variantOps = await this.downloadImportOpsFile(
batchJob.id,
batchJob,
OperationType.VariantUpdate
)
@@ -691,9 +691,11 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const uploadPromises: Promise<void>[] = []
const transactionManager = this.transactionManager_ ?? this.manager_
const files: Record<string, string> = {}
for (const op in results) {
if (results[op]?.length) {
const { writeStream, promise } = await this.fileService_
const { writeStream, fileKey, promise } = await this.fileService_
.withTransaction(transactionManager)
.getUploadStreamDescriptor({
name: ProductImportStrategy.buildFilename(batchJobId, op),
@@ -702,22 +704,29 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
uploadPromises.push(promise)
files[op] = fileKey
writeStream.write(JSON.stringify(results[op]))
writeStream.end()
}
}
await this.batchJobService_
.withTransaction(transactionManager)
.update(batchJobId, {
result: { files },
})
await Promise.all(uploadPromises)
}
/**
* Remove parsed ops JSON file.
* Download parsed ops JSON file.
*
* @param batchJobId - An id of the current batch job being processed.
* @param batchJob - the current batch job being processed
* @param op - Type of import operation.
*/
protected async downloadImportOpsFile(
batchJobId: string,
batchJob: BatchJob,
op: OperationType
): Promise<TParsedProductImportRowData[]> {
let data = ""
@@ -726,9 +735,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const readableStream = await this.fileService_
.withTransaction(transactionManager)
.getDownloadStream({
fileKey: ProductImportStrategy.buildFilename(batchJobId, op, {
appendExt: ".json",
}),
fileKey: batchJob.result.files![op],
})
return await new Promise((resolve) => {
@@ -748,18 +755,16 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
/**
* Delete parsed CSV ops files.
*
* @param batchJobId - An id of the current batch job being processed.
* @param batchJob - the current batch job being processed
*/
protected async deleteOpsFiles(batchJobId: string): Promise<void> {
protected async deleteOpsFiles(batchJob: BatchJob): Promise<void> {
const transactionManager = this.transactionManager_ ?? this.manager_
const fileServiceTx = this.fileService_.withTransaction(transactionManager)
for (const op of Object.values(OperationType)) {
for (const fileName of Object.values(batchJob.result.files!)) {
try {
await fileServiceTx.delete({
fileKey: ProductImportStrategy.buildFilename(batchJobId, op, {
appendExt: ".json",
}),
fileKey: fileName,
})
} catch (e) {
// noop
@@ -790,7 +795,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
.withTransaction(transactionManager)
.delete({ fileKey })
await this.deleteOpsFiles(batchJob.id)
await this.deleteOpsFiles(batchJob)
}
/**

View File

@@ -0,0 +1,32 @@
import stream from "stream"
export type FileServiceUploadResult = {
url: string
key: string
}
export type FileServiceGetUploadStreamResult = {
writeStream: stream.PassThrough
promise: Promise<any>
url: string
fileKey: string
[x: string]: unknown
}
export type GetUploadedFileType = {
fileKey: string
isPrivate?: boolean
[x: string]: unknown
}
export type DeleteFileType = {
fileKey: string
[x: string]: unknown
}
export type UploadStreamDescriptorType = {
name: string
ext?: string
isPrivate?: boolean
[x: string]: unknown
}

View File

@@ -6,6 +6,7 @@ export * from "./common"
export * from "./dal"
export * from "./event-bus"
export * from "./feature-flag"
export * from "./file-service"
export * from "./inventory"
export * from "./joiner"
export * from "./logger"