diff --git a/.changeset/cold-chairs-visit.md b/.changeset/cold-chairs-visit.md new file mode 100644 index 0000000000..87bd11c5d9 --- /dev/null +++ b/.changeset/cold-chairs-visit.md @@ -0,0 +1,9 @@ +--- +"@medusajs/medusa": minor +"@medusajs/file-local": patch +"medusa-file-minio": patch +"medusa-file-s3": patch +"@medusajs/types": patch +--- + +feat(medusa-file-local): local file service streaming methods diff --git a/packages/medusa-file-local/src/services/local-file-service.ts b/packages/medusa-file-local/src/services/local-file-service.ts index 3b9ca44c8d..e5962221a2 100644 --- a/packages/medusa-file-local/src/services/local-file-service.ts +++ b/packages/medusa-file-local/src/services/local-file-service.ts @@ -1,11 +1,13 @@ +import { AbstractFileService, IFileService } from "@medusajs/medusa" import { - AbstractFileService, FileServiceGetUploadStreamResult, FileServiceUploadResult, - IFileService, -} from "@medusajs/medusa" + UploadStreamDescriptorType, +} from "@medusajs/types" + import fs from "fs" -import { parse } from "path" +import path from "path" +import stream from "stream" class LocalService extends AbstractFileService implements IFileService { protected uploadDir_: string @@ -14,7 +16,7 @@ class LocalService extends AbstractFileService implements IFileService { constructor({}, options) { super({}, options) - this.uploadDir_ = options.upload_dir || "uploads/images" + this.uploadDir_ = options.upload_dir || "uploads" this.backendUrl_ = options.backend_url || "http://localhost:9000" } @@ -29,40 +31,91 @@ class LocalService extends AbstractFileService implements IFileService { async uploadFile( file: Express.Multer.File, options = {} - ): Promise<{ url: string }> { - const parsedFilename = parse(file.originalname) + ): Promise { + const parsedFilename = path.parse(file.originalname) - const fileKey = `${parsedFilename.name}-${Date.now()}${parsedFilename.ext}` + if (parsedFilename.dir) { + this.ensureDirExists(parsedFilename.dir) + } + + const fileKey = path.join( + parsedFilename.dir, + `${Date.now()}-${parsedFilename.base}` + ) return new Promise((resolve, reject) => { fs.copyFile(file.path, `${this.uploadDir_}/${fileKey}`, (err) => { if (err) { + reject(err) throw err } const fileUrl = `${this.backendUrl_}/${this.uploadDir_}/${fileKey}` - resolve({ url: fileUrl }) + resolve({ url: fileUrl, key: fileKey }) }) }) } async delete(file): Promise { - throw Error("Not implemented") + const filePath = `${this.uploadDir_}/${file.fileKey}` + if (fs.existsSync(filePath)) { + fs.unlinkSync(filePath) + } } async getUploadStreamDescriptor( - fileData + fileData: UploadStreamDescriptorType ): Promise { - throw Error("Not implemented") + const parsedFilename = path.parse( + fileData.name + (fileData.ext ? `.${fileData.ext}` : "") + ) + + if (parsedFilename.dir) { + this.ensureDirExists(parsedFilename.dir) + } + + const fileKey = path.join( + parsedFilename.dir, + `${Date.now()}-${parsedFilename.base}` + ) + + const fileUrl = `${this.backendUrl_}/${this.uploadDir_}/${fileKey}` + + const pass = new stream.PassThrough() + const writeStream = fs.createWriteStream(`${this.uploadDir_}/${fileKey}`) + + pass.pipe(writeStream) // for consistency with the IFileService + + const promise = new Promise((res, rej) => { + writeStream.on("finish", res) + writeStream.on("error", rej) + }) + + return { url: fileUrl, fileKey, writeStream: pass, promise } } async getDownloadStream(fileData): Promise { - throw Error("Not implemented") + const filePath = `${this.uploadDir_}/${fileData.fileKey}` + return fs.createReadStream(filePath) } async getPresignedDownloadUrl(fileData): Promise { - throw Error("Not implemented") + return `${this.backendUrl_}/${this.uploadDir_}/${fileData.fileKey}` + } + + /** + * Ensure `uploadDir_` has nested directories provided as file path + * + * @param dirPath - file path relative to the base directory + * @private + */ + private ensureDirExists(dirPath: string) { + const relativePath = path.join(this.uploadDir_, dirPath) + + if (!fs.existsSync(relativePath)) { + fs.mkdirSync(relativePath, { recursive: true }) + } } } diff --git a/packages/medusa-file-minio/src/services/minio.ts b/packages/medusa-file-minio/src/services/minio.ts index 0c656766b0..051c75be6b 100644 --- a/packages/medusa-file-minio/src/services/minio.ts +++ b/packages/medusa-file-minio/src/services/minio.ts @@ -1,11 +1,10 @@ +import { AbstractFileService, IFileService } from "@medusajs/medusa" import { - AbstractFileService, DeleteFileType, FileServiceUploadResult, GetUploadedFileType, - IFileService, UploadStreamDescriptorType, -} from "@medusajs/medusa" +} from "@medusajs/types" import { ClientConfiguration, PutObjectRequest } from "aws-sdk/clients/s3" import { MedusaError } from "medusa-core-utils" diff --git a/packages/medusa-file-s3/src/services/s3.ts b/packages/medusa-file-s3/src/services/s3.ts index 7ce87049f2..f39251126a 100644 --- a/packages/medusa-file-s3/src/services/s3.ts +++ b/packages/medusa-file-s3/src/services/s3.ts @@ -1,14 +1,13 @@ import fs from "fs" import aws from "aws-sdk" import { parse } from "path" +import { AbstractFileService, IFileService } from "@medusajs/medusa" import { - AbstractFileService, DeleteFileType, FileServiceUploadResult, GetUploadedFileType, - IFileService, UploadStreamDescriptorType, -} from "@medusajs/medusa" +} from "@medusajs/types" import stream from "stream" import { PutObjectRequest } from "aws-sdk/clients/s3" import { ClientConfiguration } from "aws-sdk/clients/s3" diff --git a/packages/medusa/src/interfaces/file-service.ts b/packages/medusa/src/interfaces/file-service.ts index ca6280963d..62aeb27d68 100644 --- a/packages/medusa/src/interfaces/file-service.ts +++ b/packages/medusa/src/interfaces/file-service.ts @@ -1,35 +1,11 @@ -import stream from "stream" import { TransactionBaseService } from "./transaction-base-service" - -export type FileServiceUploadResult = { - url: string -} - -export type FileServiceGetUploadStreamResult = { - writeStream: stream.PassThrough - promise: Promise - url: string - fileKey: string - [x: string]: unknown -} - -export type GetUploadedFileType = { - fileKey: string - isPrivate?: boolean - [x: string]: unknown -} - -export type DeleteFileType = { - fileKey: string - [x: string]: unknown -} - -export type UploadStreamDescriptorType = { - name: string - ext?: string - isPrivate?: boolean - [x: string]: unknown -} +import { + DeleteFileType, + FileServiceGetUploadStreamResult, + FileServiceUploadResult, + GetUploadedFileType, + UploadStreamDescriptorType, +} from "@medusajs/types" export interface IFileService extends TransactionBaseService { /** diff --git a/packages/medusa/src/services/file.ts b/packages/medusa/src/services/file.ts index 82cb1cbb8d..adaaa2930e 100644 --- a/packages/medusa/src/services/file.ts +++ b/packages/medusa/src/services/file.ts @@ -1,12 +1,12 @@ import { MedusaError } from "medusa-core-utils" import { EntityManager } from "typeorm" +import { AbstractFileService } from "../interfaces" import { - AbstractFileService, FileServiceGetUploadStreamResult, FileServiceUploadResult, GetUploadedFileType, UploadStreamDescriptorType, -} from "../interfaces" +} from "@medusajs/types" class DefaultFileService extends AbstractFileService { async upload( diff --git a/packages/medusa/src/strategies/batch-jobs/price-list/import.ts b/packages/medusa/src/strategies/batch-jobs/price-list/import.ts index 94f586ca0c..daf75d4dc7 100644 --- a/packages/medusa/src/strategies/batch-jobs/price-list/import.ts +++ b/packages/medusa/src/strategies/batch-jobs/price-list/import.ts @@ -21,6 +21,9 @@ import { TBuiltPriceListImportLine, TParsedPriceListImportRowData, } from "./types" +import { BatchJob } from "../../../models" +import { TParsedProductImportRowData } from "../product/types" +import { PriceListPriceCreateInput } from "../../../types/price-list" /* * Default strategy class used for a batch import of products/variants. @@ -298,7 +301,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy { // Upload new prices for price list const priceImportOperations = await this.downloadImportOpsFile( - batchJobId, + batchJob, OperationType.PricesCreate ) @@ -306,7 +309,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy { priceImportOperations.map(async (op) => { await txPriceListService.addPrices( priceListId, - op.prices.map((p) => { + (op.prices as PriceListPriceCreateInput[]).map((p) => { return { ...p, variant_id: op.variant_id, @@ -328,14 +331,16 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy { */ protected async uploadImportOpsFile( batchJobId: string, - results: Record + results: Record ): Promise { const uploadPromises: Promise[] = [] const transactionManager = this.transactionManager_ ?? this.manager_ + const files: Record = {} + for (const op in results) { if (results[op]?.length) { - const { writeStream, promise } = await this.fileService_ + const { writeStream, fileKey, promise } = await this.fileService_ .withTransaction(transactionManager) .getUploadStreamDescriptor({ name: PriceListImportStrategy.buildFilename(batchJobId, op), @@ -344,33 +349,38 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy { uploadPromises.push(promise) + files[op] = fileKey writeStream.write(JSON.stringify(results[op])) writeStream.end() } } + await this.batchJobService_ + .withTransaction(transactionManager) + .update(batchJobId, { + result: { files }, + }) + await Promise.all(uploadPromises) } /** - * Remove parsed ops JSON file. + * Download parsed ops JSON file. * - * @param batchJobId - An id of the current batch job being processed. + * @param batchJob - the current batch job being processed * @param op - Type of import operation. */ protected async downloadImportOpsFile( - batchJobId: string, + batchJob: BatchJob, op: OperationType - ): Promise { + ): Promise { let data = "" const transactionManager = this.transactionManager_ ?? this.manager_ const readableStream = await this.fileService_ .withTransaction(transactionManager) .getDownloadStream({ - fileKey: PriceListImportStrategy.buildFilename(batchJobId, op, { - appendExt: ".json", - }), + fileKey: batchJob.result.files![op], }) return await new Promise((resolve) => { @@ -382,7 +392,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy { }) readableStream.on("error", () => { // TODO: maybe should throw - resolve([] as PriceListImportOperation[]) + resolve([] as TParsedProductImportRowData[]) }) }) } @@ -390,18 +400,16 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy { /** * Delete parsed CSV ops files. * - * @param batchJobId - An id of the current batch job being processed. + * @param batchJob - the current batch job being processed */ - protected async deleteOpsFiles(batchJobId: string): Promise { + protected async deleteOpsFiles(batchJob: BatchJob): Promise { const transactionManager = this.transactionManager_ ?? this.manager_ const fileServiceTx = this.fileService_.withTransaction(transactionManager) - for (const op of Object.values(OperationType)) { + for (const fileName of Object.values(batchJob.result.files!)) { try { await fileServiceTx.delete({ - fileKey: PriceListImportStrategy.buildFilename(batchJobId, op, { - appendExt: ".json", - }), + fileKey: fileName, }) } catch (e) { // noop @@ -432,7 +440,7 @@ class PriceListImportStrategy extends AbstractBatchJobStrategy { .withTransaction(transactionManager) .delete({ fileKey }) - await this.deleteOpsFiles(batchJob.id) + await this.deleteOpsFiles(batchJob) } private static buildFilename( diff --git a/packages/medusa/src/strategies/batch-jobs/product/import.ts b/packages/medusa/src/strategies/batch-jobs/product/import.ts index 0a39107904..fe4f8efe4f 100644 --- a/packages/medusa/src/strategies/batch-jobs/product/import.ts +++ b/packages/medusa/src/strategies/batch-jobs/product/import.ts @@ -8,30 +8,30 @@ import ProductCategoryFeatureFlag from "../../../loaders/feature-flags/product-c import SalesChannelFeatureFlag from "../../../loaders/feature-flags/sales-channels" import { BatchJob, SalesChannel } from "../../../models" import { - BatchJobService, - ProductCategoryService, - ProductCollectionService, - ProductService, - ProductVariantService, - RegionService, - SalesChannelService, - ShippingProfileService, + BatchJobService, + ProductCategoryService, + ProductCollectionService, + ProductService, + ProductVariantService, + RegionService, + SalesChannelService, + ShippingProfileService, } from "../../../services" import CsvParser from "../../../services/csv-parser" import { CreateProductInput } from "../../../types/product" import { CreateProductVariantInput } from "../../../types/product-variant" import { - OperationType, - ProductImportBatchJob, - ProductImportCsvSchema, - ProductImportInjectedProps, - ProductImportJobContext, - TParsedProductImportRowData, + OperationType, + ProductImportBatchJob, + ProductImportCsvSchema, + ProductImportInjectedProps, + ProductImportJobContext, + TParsedProductImportRowData, } from "./types" import { - productImportColumnsDefinition, - productImportProductCategoriesColumnsDefinition, - productImportSalesChannelsColumnsDefinition, + productImportColumnsDefinition, + productImportProductCategoriesColumnsDefinition, + productImportSalesChannelsColumnsDefinition, } from "./types/columns-definition" import { transformProductData, transformVariantData } from "./utils" @@ -420,7 +420,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { const transactionManager = this.transactionManager_ ?? this.manager_ const productOps = await this.downloadImportOpsFile( - batchJob.id, + batchJob, OperationType.ProductCreate ) @@ -492,7 +492,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { const transactionManager = this.transactionManager_ ?? this.manager_ const productOps = await this.downloadImportOpsFile( - batchJob.id, + batchJob, OperationType.ProductUpdate ) @@ -568,7 +568,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { const transactionManager = this.transactionManager_ ?? this.manager_ const variantOps = await this.downloadImportOpsFile( - batchJob.id, + batchJob, OperationType.VariantCreate ) @@ -624,7 +624,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { const transactionManager = this.transactionManager_ ?? this.manager_ const variantOps = await this.downloadImportOpsFile( - batchJob.id, + batchJob, OperationType.VariantUpdate ) @@ -691,9 +691,11 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { const uploadPromises: Promise[] = [] const transactionManager = this.transactionManager_ ?? this.manager_ + const files: Record = {} + for (const op in results) { if (results[op]?.length) { - const { writeStream, promise } = await this.fileService_ + const { writeStream, fileKey, promise } = await this.fileService_ .withTransaction(transactionManager) .getUploadStreamDescriptor({ name: ProductImportStrategy.buildFilename(batchJobId, op), @@ -702,22 +704,29 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { uploadPromises.push(promise) + files[op] = fileKey writeStream.write(JSON.stringify(results[op])) writeStream.end() } } + await this.batchJobService_ + .withTransaction(transactionManager) + .update(batchJobId, { + result: { files }, + }) + await Promise.all(uploadPromises) } /** - * Remove parsed ops JSON file. + * Download parsed ops JSON file. * - * @param batchJobId - An id of the current batch job being processed. + * @param batchJob - the current batch job being processed * @param op - Type of import operation. */ protected async downloadImportOpsFile( - batchJobId: string, + batchJob: BatchJob, op: OperationType ): Promise { let data = "" @@ -726,9 +735,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { const readableStream = await this.fileService_ .withTransaction(transactionManager) .getDownloadStream({ - fileKey: ProductImportStrategy.buildFilename(batchJobId, op, { - appendExt: ".json", - }), + fileKey: batchJob.result.files![op], }) return await new Promise((resolve) => { @@ -748,18 +755,16 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { /** * Delete parsed CSV ops files. * - * @param batchJobId - An id of the current batch job being processed. + * @param batchJob - the current batch job being processed */ - protected async deleteOpsFiles(batchJobId: string): Promise { + protected async deleteOpsFiles(batchJob: BatchJob): Promise { const transactionManager = this.transactionManager_ ?? this.manager_ const fileServiceTx = this.fileService_.withTransaction(transactionManager) - for (const op of Object.values(OperationType)) { + for (const fileName of Object.values(batchJob.result.files!)) { try { await fileServiceTx.delete({ - fileKey: ProductImportStrategy.buildFilename(batchJobId, op, { - appendExt: ".json", - }), + fileKey: fileName, }) } catch (e) { // noop @@ -790,7 +795,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy { .withTransaction(transactionManager) .delete({ fileKey }) - await this.deleteOpsFiles(batchJob.id) + await this.deleteOpsFiles(batchJob) } /** diff --git a/packages/types/src/file-service/index.ts b/packages/types/src/file-service/index.ts new file mode 100644 index 0000000000..42ae41d765 --- /dev/null +++ b/packages/types/src/file-service/index.ts @@ -0,0 +1,32 @@ +import stream from "stream" + +export type FileServiceUploadResult = { + url: string + key: string +} + +export type FileServiceGetUploadStreamResult = { + writeStream: stream.PassThrough + promise: Promise + url: string + fileKey: string + [x: string]: unknown +} + +export type GetUploadedFileType = { + fileKey: string + isPrivate?: boolean + [x: string]: unknown +} + +export type DeleteFileType = { + fileKey: string + [x: string]: unknown +} + +export type UploadStreamDescriptorType = { + name: string + ext?: string + isPrivate?: boolean + [x: string]: unknown +} diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 3247ffaa9d..311276e868 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -6,6 +6,7 @@ export * from "./common" export * from "./dal" export * from "./event-bus" export * from "./feature-flag" +export * from "./file-service" export * from "./inventory" export * from "./joiner" export * from "./logger"