feat: import strategy - sales channels support (#2124)

**What**
- add support for specifying sales channel with import strategy
- additional:
  - refactor SC service to use `retrieve_` pattern
  - fix: pass arguments from `startServerWithEnvironment` to setup server
  - fix: minio undefined resolve/reject calls
  - fix: csv parser - detect missing columns from schema only if the column is required

**How**
- extending schema to expect sales channels columns in an import CSV file

RESOLVES CORE-304
This commit is contained in:
Frane Polić
2022-09-02 13:28:43 +02:00
committed by GitHub
parent d14a0398fb
commit 546a963f7b
12 changed files with 611 additions and 143 deletions

View File

@@ -0,0 +1,167 @@
const fs = require("fs")
const path = require("path")
const { useApi } = require("../../../../helpers/use-api")
const { useDb } = require("../../../../helpers/use-db")
const adminSeeder = require("../../../helpers/admin-seeder")
const userSeeder = require("../../../helpers/user-seeder")
const { simpleSalesChannelFactory } = require("../../../factories")
const batchJobSeeder = require("../../../helpers/batch-job-seeder")
const startServerWithEnvironment =
require("../../../../helpers/start-server-with-environment").default
jest.setTimeout(30000)
const adminReqConfig = {
headers: {
Authorization: "Bearer test_token",
},
}
function cleanTempData() {
// cleanup tmp ops files
const opsFiles = path.resolve("__tests__", "batch-jobs", "product", "imports")
fs.rmSync(opsFiles, { recursive: true, force: true })
}
describe("Product import - Sales Channel", () => {
let dbConnection
let medusaProcess
beforeAll(async () => {
const cwd = path.resolve(path.join(__dirname, "..", "..", ".."))
cleanTempData()
const [process, connection] = await startServerWithEnvironment({
cwd,
env: { MEDUSA_FF_SALES_CHANNELS: true },
redisUrl: "redis://127.0.0.1:6379",
uploadDir: __dirname,
verbose: true,
})
dbConnection = connection
medusaProcess = process
})
afterAll(async () => {
const db = useDb()
await db.shutdown()
cleanTempData()
medusaProcess.kill()
})
beforeEach(async () => {
try {
await batchJobSeeder(dbConnection)
await adminSeeder(dbConnection)
await userSeeder(dbConnection)
await simpleSalesChannelFactory(dbConnection, {
name: "Import Sales Channel 1",
})
await simpleSalesChannelFactory(dbConnection, {
name: "Import Sales Channel 2",
})
} catch (e) {
console.log(e)
throw e
}
})
afterEach(async () => {
const db = useDb()
return await db.teardown()
})
it("Import products to an existing sales channel", async () => {
jest.setTimeout(1000000)
const api = useApi()
const response = await api.post(
"/admin/batch-jobs",
{
type: "product-import",
context: {
fileKey: "product-import-ss.csv",
},
},
adminReqConfig
)
const batchJobId = response.data.batch_job.id
let batchJob
let shouldContinuePulling = true
while (shouldContinuePulling) {
const res = await api.get(
`/admin/batch-jobs/${batchJobId}`,
adminReqConfig
)
await new Promise((resolve, _) => {
setTimeout(resolve, 1000)
})
batchJob = res.data.batch_job
shouldContinuePulling = !(
batchJob.status === "completed" || batchJob.status === "failed"
)
}
expect(batchJob.status).toBe("completed")
const productsResponse = await api.get("/admin/products", adminReqConfig)
expect(productsResponse.data.count).toBe(2)
expect(productsResponse.data.products).toEqual([
expect.objectContaining({
id: "O6S1YQ6mKm",
title: "Test product",
description: "test-product-description-1",
handle: "test-product-product-1",
variants: [
expect.objectContaining({
title: "Test variant",
product_id: "O6S1YQ6mKm",
sku: "test-sku-1",
}),
],
sales_channels: [
expect.objectContaining({
name: "Import Sales Channel 1",
is_disabled: false,
}),
expect.objectContaining({
name: "Import Sales Channel 2",
is_disabled: false,
}),
],
}),
expect.objectContaining({
id: "5VxiEkmnPV",
title: "Test product",
description: "test-product-description",
handle: "test-product-product-2",
variants: [
expect.objectContaining({
title: "Test variant",
product_id: "5VxiEkmnPV",
sku: "test-sku-2",
}),
expect.objectContaining({
title: "Test variant",
product_id: "5VxiEkmnPV",
sku: "test-sku-3",
}),
],
sales_channels: [],
}),
])
})
})

View File

@@ -74,7 +74,7 @@ describe("Product import batch job", () => {
const response = await api.post(
"/admin/batch-jobs",
{
type: "product_import",
type: "product-import",
context: {
fileKey: "product-import.csv",
},

View File

@@ -0,0 +1,4 @@
Product id,Product Handle,Product Title,Product Subtitle,Product Description,Product Status,Product Thumbnail,Product Weight,Product Length,Product Width,Product Height,Product HS Code,Product Origin Country,Product Mid Code,Product Material,Product Collection Title,Product Collection Handle,Product Type,Product Tags,Product Discountable,Product External ID,Product Profile Name,Product Profile Type,Variant id,Variant Title,Variant SKU,Variant Barcode,Variant Inventory Quantity,Variant Allow backorder,Variant Manage inventory,Variant Weight,Variant Length,Variant Width,Variant Height,Variant HS Code,Variant Origin Country,Variant Mid Code,Variant Material,Price ImportLand [EUR],Price USD,Price denmark [DKK],Price Denmark [DKK],Option 1 Name,Option 1 Value,Option 2 Name,Option 2 Value,Image 1 Url,Sales Channel 1 Name,Sales Channel 2 Name,Sales Channel 1 Id,Sales Channel 2 Id
O6S1YQ6mKm,test-product-product-1,Test product,,test-product-description-1,draft,,,,,,,,,,Test collection 1,test-collection1,test-type-1,123_1,TRUE,,profile_1,profile_type_1,,Test variant,test-sku-1,test-barcode-1,10,FALSE,TRUE,,,,,,,,,100,110,130,,test-option-1,option 1 value red,test-option-2,option 2 value 1,test-image.png,Import Sales Channel 1,Import Sales Channel 2,,
5VxiEkmnPV,test-product-product-2,Test product,,test-product-description,draft,,,,,,,,,,Test collection,test-collection2,test-type,123,TRUE,,profile_2,profile_type_2,,Test variant,test-sku-2,test-barcode-2,10,FALSE,TRUE,,,,,,,,,,,,110,test-option,Option 1 value 1,,,test-image.png,,,,
5VxiEkmnPV,test-product-product-2,Test product,,test-product-description,draft,,,,,,,,,,Test collection,test-collection2,test-type,123,TRUE,,profile_2,profile_type_2,,Test variant,test-sku-3,test-barcode-3,10,FALSE,TRUE,,,,,,,,,,120,,,test-option,Option 1 Value blue,,,test-image.png,,,,
1 Product id Product Handle Product Title Product Subtitle Product Description Product Status Product Thumbnail Product Weight Product Length Product Width Product Height Product HS Code Product Origin Country Product Mid Code Product Material Product Collection Title Product Collection Handle Product Type Product Tags Product Discountable Product External ID Product Profile Name Product Profile Type Variant id Variant Title Variant SKU Variant Barcode Variant Inventory Quantity Variant Allow backorder Variant Manage inventory Variant Weight Variant Length Variant Width Variant Height Variant HS Code Variant Origin Country Variant Mid Code Variant Material Price ImportLand [EUR] Price USD Price denmark [DKK] Price Denmark [DKK] Option 1 Name Option 1 Value Option 2 Name Option 2 Value Image 1 Url Sales Channel 1 Name Sales Channel 2 Name Sales Channel 1 Id Sales Channel 2 Id
2 O6S1YQ6mKm test-product-product-1 Test product test-product-description-1 draft Test collection 1 test-collection1 test-type-1 123_1 TRUE profile_1 profile_type_1 Test variant test-sku-1 test-barcode-1 10 FALSE TRUE 100 110 130 test-option-1 option 1 value red test-option-2 option 2 value 1 test-image.png Import Sales Channel 1 Import Sales Channel 2
3 5VxiEkmnPV test-product-product-2 Test product test-product-description draft Test collection test-collection2 test-type 123 TRUE profile_2 profile_type_2 Test variant test-sku-2 test-barcode-2 10 FALSE TRUE 110 test-option Option 1 value 1 test-image.png
4 5VxiEkmnPV test-product-product-2 Test product test-product-description draft Test collection test-collection2 test-type 123 TRUE profile_2 profile_type_2 Test variant test-sku-3 test-barcode-3 10 FALSE TRUE 120 test-option Option 1 Value blue test-image.png

View File

@@ -1,7 +1,13 @@
const setupServer = require("./setup-server")
const { initDb } = require("./use-db")
const startServerWithEnvironment = async ({ cwd, verbose, env }) => {
const startServerWithEnvironment = async ({
cwd,
redisUrl,
uploadDir,
verbose,
env,
}) => {
if (env) {
Object.entries(env).forEach(([key, value]) => {
process.env[key] = value
@@ -19,6 +25,8 @@ const startServerWithEnvironment = async ({ cwd, verbose, env }) => {
const medusaProcess = await setupServer({
cwd,
verbose,
redisUrl,
uploadDir,
env,
})

View File

@@ -60,22 +60,26 @@ class MinioService extends AbstractFileService {
}
return await Promise.all([
s3.deleteObject({ ...params, Bucket: this.bucket_ }, (err, data) => {
if (err) {
reject(err)
return
}
resolve(data)
}),
s3.deleteObject(
{ ...params, Bucket: this.private_bucket_ },
(err, data) => {
new Promise((resolve, reject) =>
s3.deleteObject({ ...params, Bucket: this.bucket_ }, (err, data) => {
if (err) {
reject(err)
return
}
resolve(data)
}
})
),
new Promise((resolve, reject) =>
s3.deleteObject(
{ ...params, Bucket: this.private_bucket_ },
(err, data) => {
if (err) {
reject(err)
return
}
resolve(data)
}
)
),
])
}

View File

@@ -2,15 +2,64 @@ import {
Brackets,
DeleteResult,
EntityRepository,
FindManyOptions,
In,
Repository,
} from "typeorm"
import { SalesChannel } from "../models"
import { ExtendedFindConfig, Selector } from "../types/common";
import { ExtendedFindConfig, Selector } from "../types/common"
import { flatten, groupBy, merge } from "lodash"
@EntityRepository(SalesChannel)
export class SalesChannelRepository extends Repository<SalesChannel> {
public async getFreeTextSearchResultsAndCount(
public async findWithRelations(
relations: (keyof SalesChannel | string)[] = [],
idsOrOptionsWithoutRelations:
| Omit<FindManyOptions<SalesChannel>, "relations">
| string[] = {}
): Promise<[SalesChannel[], number]> {
let entities: SalesChannel[] = []
let count = 0
if (Array.isArray(idsOrOptionsWithoutRelations)) {
entities = await this.findByIds(idsOrOptionsWithoutRelations)
count = idsOrOptionsWithoutRelations.length
} else {
const [results, resultCount] = await this.findAndCount(
idsOrOptionsWithoutRelations
)
entities = results
count = resultCount
}
const entitiesIds = entities.map(({ id }) => id)
const groupedRelations = {}
for (const rel of relations) {
const [topLevel] = rel.split(".")
if (groupedRelations[topLevel]) {
groupedRelations[topLevel].push(rel)
} else {
groupedRelations[topLevel] = [rel]
}
}
const entitiesIdsWithRelations = await Promise.all(
Object.entries(groupedRelations).map(([_, rels]) => {
return this.findByIds(entitiesIds, {
select: ["id"],
relations: rels as string[],
})
})
).then(flatten)
const entitiesAndRelations = entitiesIdsWithRelations.concat(entities)
const entitiesAndRelationsById = groupBy(entitiesAndRelations, "id")
return [
Object.values(entitiesAndRelationsById).map((v) => merge({}, ...v)),
count,
]
}
public async getFreeTextSearchResultsAndCount(
q: string,
options: ExtendedFindConfig<SalesChannel, Selector<SalesChannel>> = {
where: {},
@@ -70,4 +119,21 @@ export class SalesChannelRepository extends Repository<SalesChannel> {
.orIgnore()
.execute()
}
public async findOneWithRelations(
relations: Array<keyof SalesChannel> = [],
optionsWithoutRelations: Omit<
FindManyOptions<SalesChannel>,
"relations"
> = {}
): Promise<SalesChannel> {
// Limit 1
optionsWithoutRelations.take = 1
const [result] = await this.findWithRelations(
relations,
optionsWithoutRelations
)
return result[0]
}
}

View File

@@ -1,11 +1,10 @@
import { IdMap, MockManager, MockRepository } from "medusa-test-utils"
import SalesChannelService from "../sales-channel"
import { EventBusServiceMock } from "../__mocks__/event-bus"
import { EventBusService, ProductService, StoreService } from "../index"
import { FindConditions, FindOneOptions } from "typeorm"
import { EventBusService, StoreService } from "../index"
import { FindConditions, FindManyOptions, FindOneOptions } from "typeorm"
import { SalesChannel } from "../../models"
import { ProductServiceMock } from "../__mocks__/product";
import { store, StoreServiceMock } from "../__mocks__/store";
import { store, StoreServiceMock } from "../__mocks__/store"
describe("SalesChannelService", () => {
const salesChannelData = {
@@ -16,56 +15,79 @@ describe("SalesChannelService", () => {
const salesChannelRepositoryMock = {
...MockRepository({
findOneWithRelations: jest
.fn()
.mockImplementation(
(
relations: Array<keyof SalesChannel> = [],
optionsWithoutRelations: Omit<
FindManyOptions<SalesChannel>,
"relations"
>
): any => {
return Promise.resolve({
id:
(optionsWithoutRelations?.where as FindConditions<SalesChannel>)
?.id ?? IdMap.getId("sc_adjhlukiaeswhfae"),
...salesChannelData,
})
}
),
findOne: jest
.fn()
.mockImplementation(
(queryOrId: string | FindOneOptions<SalesChannel>): any => {
return Promise.resolve({
id:
typeof queryOrId === "string"
? queryOrId
: (queryOrId?.where as FindConditions<SalesChannel>)?.id ??
IdMap.getId("sc_adjhlukiaeswhfae"),
...salesChannelData,
})
}
),
.fn()
.mockImplementation(
(queryOrId: string | FindOneOptions<SalesChannel>): any => {
return Promise.resolve({
id:
typeof queryOrId === "string"
? queryOrId
: (queryOrId?.where as FindConditions<SalesChannel>)?.id ??
IdMap.getId("sc_adjhlukiaeswhfae"),
...salesChannelData,
})
}
),
findAndCount: jest.fn().mockImplementation(() =>
Promise.resolve([
{
id: IdMap.getId("sales_channel_1"),
...salesChannelData
...salesChannelData,
},
]),
])
),
create: jest.fn().mockImplementation((data) => data),
save: (salesChannel) => Promise.resolve({
id: IdMap.getId("sales_channel_1"),
...salesChannel
}),
save: (salesChannel) =>
Promise.resolve({
id: IdMap.getId("sales_channel_1"),
...salesChannel,
}),
softRemove: jest.fn().mockImplementation((id: string): any => {
return Promise.resolve()
return Promise.resolve()
}),
}),
getFreeTextSearchResultsAndCount: jest.fn().mockImplementation(() =>
Promise.resolve([
{
id: IdMap.getId("sales_channel_1"),
...salesChannelData
...salesChannelData,
},
])
),
removeProducts: jest.fn().mockImplementation((id: string, productIds: string[]): any => {
Promise.resolve([
{
id: IdMap.getId("sales_channel_1"),
...salesChannelData
},
])
}),
addProducts: jest.fn().mockImplementation((id: string, productIds: string[]): any => {
return Promise.resolve()
}),
removeProducts: jest
.fn()
.mockImplementation((id: string, productIds: string[]): any => {
Promise.resolve([
{
id: IdMap.getId("sales_channel_1"),
...salesChannelData,
},
])
}),
addProducts: jest
.fn()
.mockImplementation((id: string, productIds: string[]): any => {
return Promise.resolve()
}),
}
describe("create default", async () => {
@@ -145,8 +167,9 @@ describe("SalesChannelService", () => {
...salesChannelData,
})
expect(salesChannelRepositoryMock.findOne).toHaveBeenCalledTimes(1)
expect(salesChannelRepositoryMock.findOne).toHaveBeenLastCalledWith({
expect(
salesChannelRepositoryMock.findOneWithRelations
).toHaveBeenLastCalledWith(undefined, {
where: { id: IdMap.getId("sales_channel_1") },
})
})
@@ -206,48 +229,53 @@ describe("SalesChannelService", () => {
expect(salesChannel).toBeTruthy()
expect(salesChannel).toEqual(
expect.arrayContaining([{
id: IdMap.getId("sales_channel_1"),
...salesChannelData,
}])
expect.arrayContaining([
{
id: IdMap.getId("sales_channel_1"),
...salesChannelData,
},
])
)
expect(salesChannelRepositoryMock.findAndCount).toHaveBeenCalledTimes(0)
expect(salesChannelRepositoryMock.getFreeTextSearchResultsAndCount).toHaveBeenCalledTimes(1)
expect(salesChannelRepositoryMock.getFreeTextSearchResultsAndCount).toHaveBeenLastCalledWith(
q,
{
skip: 0,
take: 20,
where: {},
}
)
expect(
salesChannelRepositoryMock.getFreeTextSearchResultsAndCount
).toHaveBeenCalledTimes(1)
expect(
salesChannelRepositoryMock.getFreeTextSearchResultsAndCount
).toHaveBeenLastCalledWith(q, {
skip: 0,
take: 20,
where: {},
})
})
it("should retrieve a sales channel using find and count", async () => {
const salesChannel = await salesChannelService.listAndCount({
id: IdMap.getId("sales_channel_1")
id: IdMap.getId("sales_channel_1"),
})
expect(salesChannel).toBeTruthy()
expect(salesChannel).toEqual(
expect.arrayContaining([{
id: IdMap.getId("sales_channel_1"),
...salesChannelData,
}])
expect.arrayContaining([
{
id: IdMap.getId("sales_channel_1"),
...salesChannelData,
},
])
)
expect(salesChannelRepositoryMock.getFreeTextSearchResultsAndCount).toHaveBeenCalledTimes(0)
expect(
salesChannelRepositoryMock.getFreeTextSearchResultsAndCount
).toHaveBeenCalledTimes(0)
expect(salesChannelRepositoryMock.findAndCount).toHaveBeenCalledTimes(1)
expect(salesChannelRepositoryMock.findAndCount).toHaveBeenLastCalledWith(
{
skip: 0,
take: 20,
where: {
id: IdMap.getId("sales_channel_1"),
},
}
)
expect(salesChannelRepositoryMock.findAndCount).toHaveBeenLastCalledWith({
skip: 0,
take: 20,
where: {
id: IdMap.getId("sales_channel_1"),
},
})
})
})
@@ -318,7 +346,7 @@ describe("SalesChannelService", () => {
jest.clearAllMocks()
})
it('should remove a list of product to a sales channel', async () => {
it("should remove a list of product to a sales channel", async () => {
const salesChannel = await salesChannelService.removeProducts(
IdMap.getId("sales_channel_1"),
[IdMap.getId("sales_channel_1_product_1")]
@@ -349,7 +377,7 @@ describe("SalesChannelService", () => {
jest.clearAllMocks()
})
it('should add a list of product to a sales channel', async () => {
it("should add a list of product to a sales channel", async () => {
const salesChannel = await salesChannelService.addProducts(
IdMap.getId("sales_channel_1"),
[IdMap.getId("sales_channel_1_product_1")]

View File

@@ -53,6 +53,9 @@ class CsvParser<
): Promise<TOutputResult> {
let outputTuple = {} as TOutputResult
const columnMap = this.buildColumnMap_(this.$$schema.columns)
const requiredColumnsMap = this.buildColumnMap_(
this.$$schema.columns.filter((col) => col.required)
)
const tupleKeys = Object.keys(line)
@@ -95,10 +98,10 @@ class CsvParser<
}
/**
* missing columns = columns defined in the schema - columns present in the line
* missing columns = columns defined (& required) in the schema - columns present in the line
*/
const missingColumns = difference(
Object.keys(columnMap),
Object.keys(requiredColumnsMap),
Object.keys(processedColumns)
)

View File

@@ -2,7 +2,7 @@ import {
CreateSalesChannelInput,
UpdateSalesChannelInput,
} from "../types/sales-channels"
import { FindConfig, QuerySelector } from "../types/common"
import { FindConfig, QuerySelector, Selector } from "../types/common"
import { EntityManager } from "typeorm"
import EventBusService from "./event-bus"
@@ -54,9 +54,49 @@ class SalesChannelService extends TransactionBaseService {
this.storeService_ = storeService
}
/**
* A generic retrieve used to find a sales channel by different attributes.
*
* @param selector - SC selector
* @param config - find config
* @returns a single SC matching the query or throws
*/
protected async retrieve_(
selector: Selector<SalesChannel>,
config: FindConfig<SalesChannel> = {}
): Promise<SalesChannel> {
const manager = this.manager_
const salesChannelRepo = manager.getCustomRepository(
this.salesChannelRepository_
)
const { relations, ...query } = buildQuery(selector, config)
const salesChannel = await salesChannelRepo.findOneWithRelations(
relations as (keyof SalesChannel)[],
query
)
if (!salesChannel) {
const selectorConstraints = Object.entries(selector)
.map((key, value) => `${key}: ${value}`)
.join(", ")
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`Sales channel with ${selectorConstraints} was not found`
)
}
return salesChannel
}
/**
* Retrieve a SalesChannel by id
*
* @param salesChannelId - id of the channel to retrieve
* @param config - SC config
* @experimental This feature is under development and may change in the future.
* To use this feature please enable the corresponding feature flag in your medusa backend project.
* @returns a sales channel
@@ -65,28 +105,21 @@ class SalesChannelService extends TransactionBaseService {
salesChannelId: string,
config: FindConfig<SalesChannel> = {}
): Promise<SalesChannel | never> {
const manager = this.manager_
const salesChannelRepo = manager.getCustomRepository(
this.salesChannelRepository_
)
return await this.retrieve_({ id: salesChannelId }, config)
}
const query = buildQuery(
{
id: salesChannelId,
},
config
)
const salesChannel = await salesChannelRepo.findOne(query)
if (!salesChannel) {
throw new MedusaError(
MedusaError.Types.NOT_FOUND,
`Sales channel with id ${salesChannelId} was not found`
)
}
return salesChannel
/**
* Find a sales channel by name.
*
* @param name of the sales channel
* @param config - find config
* @return a sales channel with matching name
*/
async retrieveByName(
name: string,
config: FindConfig<SalesChannel> = {}
): Promise<SalesChannel | unknown> {
return await this.retrieve_({ name }, config)
}
/**

View File

@@ -15,6 +15,7 @@ import {
ShippingProfileService,
} from "../../../../services"
import { InjectedProps } from "../../../batch-jobs/product/types"
import { FlagRouter } from "../../../../utils/flag-router"
let fakeJob = {
id: IdMap.getId("product-import-job"),
@@ -142,6 +143,7 @@ describe("Product import strategy", () => {
productVariantService:
productVariantServiceMock as unknown as ProductVariantService,
regionService: regionServiceMock as unknown as RegionService,
featureFlagRouter: new FlagRouter({}),
} as unknown as InjectedProps)
it("`preProcessBatchJob` should generate import ops and upload them to a bucket using the file service", async () => {
@@ -160,14 +162,14 @@ describe("Product import strategy", () => {
1,
{
ext: "json",
name: `imports/products/import/ops/${fakeJob.id}-PRODUCT_CREATE`,
name: `imports/products/ops/${fakeJob.id}-PRODUCT_CREATE`,
}
)
expect(fileServiceMock.getUploadStreamDescriptor).toHaveBeenNthCalledWith(
2,
{
ext: "json",
name: `imports/products/import/ops/${fakeJob.id}-VARIANT_UPDATE`, // because row data has variant.id
name: `imports/products/ops/${fakeJob.id}-VARIANT_UPDATE`, // because row data has variant.id
}
)

View File

@@ -9,9 +9,10 @@ import {
ProductService,
ProductVariantService,
RegionService,
SalesChannelService,
ShippingProfileService,
} from "../../../services"
import { CreateProductInput } from "../../../types/product"
import { CreateProductInput, UpdateProductInput } from "../../../types/product"
import {
CreateProductVariantInput,
UpdateProductVariantInput,
@@ -24,7 +25,10 @@ import {
TBuiltProductImportLine,
TParsedProductImportRowData,
} from "./types"
import { SalesChannel } from "../../../models"
import { FlagRouter } from "../../../utils/flag-router"
import { transformProductData, transformVariantData } from "./utils"
import SalesChannelFeatureFlag from "../../../loaders/feature-flags/sales-channels"
/**
* Process this many variant rows before reporting progress.
@@ -35,22 +39,25 @@ const BATCH_SIZE = 100
* Default strategy class used for a batch import of products/variants.
*/
class ProductImportStrategy extends AbstractBatchJobStrategy {
static identifier = "product-import"
static identifier = "product-import-strategy"
static batchType = "product_import"
static batchType = "product-import"
private processedCounter: Record<string, number> = {}
protected readonly featureFlagRouter_: FlagRouter
protected manager_: EntityManager
protected transactionManager_: EntityManager | undefined
protected readonly fileService_: IFileService
protected readonly regionService_: RegionService
protected readonly productService_: ProductService
protected readonly batchJobService_: BatchJobService
protected readonly salesChannelService_: SalesChannelService
protected readonly productVariantService_: ProductVariantService
protected readonly shippingProfileService_: ShippingProfileService
protected readonly regionService_: RegionService
protected readonly csvParser_: CsvParser<
ProductImportCsvSchema,
@@ -61,21 +68,36 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
constructor({
batchJobService,
productService,
salesChannelService,
productVariantService,
shippingProfileService,
regionService,
fileService,
manager,
featureFlagRouter,
}: InjectedProps) {
// eslint-disable-next-line prefer-rest-params
super(arguments[0])
this.csvParser_ = new CsvParser(CSVSchema)
const isSalesChannelsFeatureOn = featureFlagRouter.isFeatureEnabled(
SalesChannelFeatureFlag.key
)
this.csvParser_ = new CsvParser({
...CSVSchema,
columns: [
...CSVSchema.columns,
...(isSalesChannelsFeatureOn ? SalesChannelsSchema.columns : []),
],
})
this.featureFlagRouter_ = featureFlagRouter
this.manager_ = manager
this.fileService_ = fileService
this.batchJobService_ = batchJobService
this.productService_ = productService
this.salesChannelService_ = salesChannelService
this.productVariantService_ = productVariantService
this.shippingProfileService_ = shippingProfileService
this.regionService_ = regionService
@@ -89,15 +111,18 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
* Create a description of a row on which the error occurred and throw a Medusa error.
*
* @param row - Parsed CSV row data
* @param errorDescription - Concrete error
*/
protected static throwDescriptiveError(
row: TParsedProductImportRowData
row: TParsedProductImportRowData,
errorDescription?: string
): never {
const message = `Error while processing row with:
product id: ${row["product.id"]},
product handle: ${row["product.handle"]},
variant id: ${row["variant.id"]}
variant sku: ${row["variant.sku"]}`
variant sku: ${row["variant.sku"]}
${errorDescription}`
throw new MedusaError(MedusaError.Types.INVALID_DATA, message)
}
@@ -168,18 +193,18 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
}
if (price.regionName) {
const region = await this.regionService_
.withTransaction(transactionManager)
.retrieveByName(price.regionName)
if (!region) {
try {
record.region_id = (
await this.regionService_
.withTransaction(transactionManager)
.retrieveByName(price.regionName)
)?.id
} catch (e) {
throw new MedusaError(
MedusaError.Types.INVALID_DATA,
`Trying to set a price for a region ${price.regionName} that doesn't exist`
)
}
record.region_id = region!.id
} else {
record.currency_code = price.currency_code
}
@@ -255,6 +280,49 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
})
}
/**
* Create, or retrieve by name, sales channels from the input data.
*
* NOTE: Sales channel names provided in the CSV must exist in the DB.
* New sales channels will not be created.
*
* @param data an array of sales channels partials
* @return an array of sales channels created or retrieved by name
*/
private async processSalesChannels(
data: Pick<SalesChannel, "name" | "id">[]
): Promise<SalesChannel[]> {
const transactionManager = this.transactionManager_ ?? this.manager_
const salesChannelServiceTx =
this.salesChannelService_.withTransaction(transactionManager)
const salesChannels: SalesChannel[] = []
for (const input of data) {
let channel: SalesChannel | null = null
if (input.id) {
try {
channel = await salesChannelServiceTx.retrieve(input.id, {
select: ["id"],
})
} catch (e) {
// noop - check if the channel exists with provided name
}
}
if (!channel) {
channel = (await salesChannelServiceTx.retrieveByName(input.name, {
select: ["id"],
})) as SalesChannel
}
salesChannels.push(channel)
}
return salesChannels
}
/**
* Method creates products using `ProductService` and parsed data from a CSV row.
*
@@ -267,15 +335,31 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
OperationType.ProductCreate
)
const productServiceTx =
this.productService_.withTransaction(transactionManager)
const isSalesChannelsFeatureOn = this.featureFlagRouter_.isFeatureEnabled(
SalesChannelFeatureFlag.key
)
for (const productOp of productOps) {
const productData = transformProductData(
productOp
) as unknown as CreateProductInput
try {
await this.productService_
.withTransaction(transactionManager)
.create(
transformProductData(productOp) as unknown as CreateProductInput
if (isSalesChannelsFeatureOn) {
productData["sales_channels"] = await this.processSalesChannels(
productOp["product.sales_channels"] as Pick<
SalesChannel,
"name" | "id"
>[]
)
}
await productServiceTx.create(productData)
} catch (e) {
ProductImportStrategy.throwDescriptiveError(productOp)
ProductImportStrategy.throwDescriptiveError(productOp, e.message)
}
this.updateProgress(batchJobId)
@@ -294,16 +378,31 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
OperationType.ProductUpdate
)
const productServiceTx =
this.productService_.withTransaction(transactionManager)
const isSalesChannelsFeatureOn = this.featureFlagRouter_.isFeatureEnabled(
SalesChannelFeatureFlag.key
)
for (const productOp of productOps) {
const productData = transformProductData(productOp) as UpdateProductInput
try {
await this.productService_
.withTransaction(transactionManager)
.update(
productOp["product.id"] as string,
transformProductData(productOp)
if (isSalesChannelsFeatureOn) {
productData["sales_channels"] = await this.processSalesChannels(
productOp["product.sales_channels"] as Pick<
SalesChannel,
"name" | "id"
>[]
)
}
await productServiceTx.update(
productOp["product.id"] as string,
productData
)
} catch (e) {
ProductImportStrategy.throwDescriptiveError(productOp)
ProductImportStrategy.throwDescriptiveError(productOp, e.message)
}
this.updateProgress(batchJobId)
@@ -355,7 +454,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
this.updateProgress(batchJobId)
} catch (e) {
ProductImportStrategy.throwDescriptiveError(variantOp)
ProductImportStrategy.throwDescriptiveError(variantOp, e.message)
}
}
}
@@ -391,7 +490,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
transformVariantData(variantOp) as UpdateProductVariantInput
)
} catch (e) {
ProductImportStrategy.throwDescriptiveError(variantOp)
ProductImportStrategy.throwDescriptiveError(variantOp, e.message)
}
this.updateProgress(batchJobId)
@@ -437,13 +536,14 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const { writeStream, promise } = await this.fileService_
.withTransaction(transactionManager)
.getUploadStreamDescriptor({
name: `imports/products/import/ops/${batchJobId}-${op}`,
name: `imports/products/ops/${batchJobId}-${op}`,
ext: "json",
})
uploadPromises.push(promise)
writeStream.write(JSON.stringify(results[op]))
writeStream.end()
}
}
@@ -466,8 +566,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
const readableStream = await this.fileService_
.withTransaction(transactionManager)
.getDownloadStream({
fileKey: `imports/products/import/ops/${batchJobId}-${op}`,
ext: "json",
fileKey: `imports/products/ops/${batchJobId}-${op}.json`,
})
return await new Promise((resolve) => {
@@ -477,9 +576,10 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
readableStream.on("end", () => {
resolve(JSON.parse(data))
})
readableStream.on("error", () =>
readableStream.on("error", () => {
// TODO: maybe should throw
resolve([] as TParsedProductImportRowData[])
)
})
})
}
@@ -494,7 +594,7 @@ class ProductImportStrategy extends AbstractBatchJobStrategy {
for (const op of Object.keys(OperationType)) {
try {
this.fileService_.withTransaction(transactionManager).delete({
fileKey: `imports/products/import/ops/-${batchJobId}-${op}`,
fileKey: `imports/products/ops/-${batchJobId}-${op}`,
})
} catch (e) {
// noop
@@ -659,6 +759,7 @@ const CSVSchema: ProductImportCsvSchema = {
return builtLine
},
},
// PRICES
{
name: "Price Region",
@@ -730,3 +831,52 @@ const CSVSchema: ProductImportCsvSchema = {
},
],
}
const SalesChannelsSchema: ProductImportCsvSchema = {
columns: [
{
name: "Sales Channel Name",
match: /Sales Channel \d+ Name/,
reducer: (builtLine, key, value): TBuiltProductImportLine => {
builtLine["product.sales_channels"] =
builtLine["product.sales_channels"] || []
if (typeof value === "undefined" || value === null) {
return builtLine
}
;(
builtLine["product.sales_channels"] as Record<
string,
string | number
>[]
).push({
name: value,
})
return builtLine
},
},
{
name: "Sales Channel Id",
match: /Sales Channel \d+ Id/,
reducer: (builtLine, key, value): TBuiltProductImportLine => {
builtLine["product.sales_channels"] =
builtLine["product.sales_channels"] || []
if (typeof value === "undefined" || value === null) {
return builtLine
}
;(
builtLine["product.sales_channels"] as Record<
string,
string | number
>[]
).push({
id: value,
})
return builtLine
},
},
],
}

View File

@@ -6,10 +6,11 @@ import {
ProductService,
ProductVariantService,
RegionService,
SalesChannelService,
ShippingProfileService,
} from "../../../services"
import { ProductOptionRepository } from "../../../repositories/product-option"
import { CsvSchema } from "../../../interfaces/csv-parser"
import { FlagRouter } from "../../../utils/flag-router"
/**
* DI props for the Product import strategy
@@ -19,9 +20,11 @@ export type InjectedProps = {
productService: ProductService
productVariantService: ProductVariantService
shippingProfileService: ShippingProfileService
salesChannelService: SalesChannelService
regionService: RegionService
fileService: typeof FileService
featureFlagRouter: FlagRouter
manager: EntityManager
}