feat(medusa): Move search indexing into a separate subscriber to defer the work load (#1874)
**What**
Move the preliminary indexing action at boot time to a separate subscriber in order to defer the work load in the background and therefore to avoid increasing the load time when the number of products increase with time.
**Tests**
Add 10k products (since it is our limit, tried with 50k before getting the error limit) using
```sal
do $$
declare
counter integer := 0;
begin
while counter < 10000 loop
INSERT INTO product (id, title, description, handle, profile_id)
(SELECT * FROM ((SELECT random(), random(), random(), random(), 'sp_01FNB9K7FXB0SZMKXD013RJYSP')) as T);
counter := counter + 1;
end loop;
end$$;
```
then start the server and while the server is starting, hit the search end point repeatedly
FIXES CORE-258
Co-authored-by: Oliver Windall Juhl <59018053+olivermrbl@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
0e0b131488
commit
b8ddb31f6f
6
.changeset/large-bottles-jam.md
Normal file
6
.changeset/large-bottles-jam.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
"medusa-interfaces": patch
|
||||
"@medusajs/medusa": patch
|
||||
---
|
||||
|
||||
Move search indexing into a separate subscriber to defer the work load
|
||||
@@ -9,6 +9,10 @@ class SearchService extends BaseService {
|
||||
super()
|
||||
}
|
||||
|
||||
get options() {
|
||||
return this.options_ ?? {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to create an index
|
||||
* @param indexName {string} - the index name
|
||||
@@ -32,7 +36,7 @@ class SearchService extends BaseService {
|
||||
* Used to index documents by the search engine provider
|
||||
* @param indexName {string} - the index name
|
||||
* @param documents {Array.<Object>} - documents array to be indexed
|
||||
* @param type {Array.<Object>} - type of documents to be added (e.g: products, regions, orders, etc)
|
||||
* @param type {string} - type of documents to be added (e.g: products, regions, orders, etc)
|
||||
* @return {Promise<{object}>} - returns response from search engine provider
|
||||
*/
|
||||
addDocuments(indexName, documents, type) {
|
||||
|
||||
@@ -179,7 +179,7 @@ export default async ({
|
||||
const searchActivity = Logger.activity("Initializing search engine indexing")
|
||||
track("SEARCH_ENGINE_INDEXING_STARTED")
|
||||
await searchIndexLoader({ container })
|
||||
const searchAct = Logger.success(searchActivity, "Indexing completed") || {}
|
||||
const searchAct = Logger.success(searchActivity, "Indexing event emitted") || {}
|
||||
track("SEARCH_ENGINE_INDEXING_COMPLETED", { duration: searchAct.duration })
|
||||
|
||||
return { container, dbConnection, app: expressApp }
|
||||
|
||||
@@ -1,67 +1,19 @@
|
||||
import ProductService from "../services/product"
|
||||
import { indexTypes } from "medusa-core-utils"
|
||||
import { MedusaContainer } from "../types/global"
|
||||
import DefaultSearchService from "../services/search"
|
||||
import { Logger } from "../types/global"
|
||||
import { EventBusService } from "../services"
|
||||
|
||||
async function loadProductsIntoSearchEngine(
|
||||
container: MedusaContainer
|
||||
): Promise<void> {
|
||||
const searchService = container.resolve<DefaultSearchService>("searchService")
|
||||
const productService = container.resolve<ProductService>("productService")
|
||||
export const SEARCH_INDEX_EVENT = "SEARCH_INDEX_EVENT"
|
||||
|
||||
const TAKE = 20
|
||||
let hasMore = true
|
||||
|
||||
let lastSeenId = ""
|
||||
|
||||
while (hasMore) {
|
||||
const products = await productService.list(
|
||||
{ id: { gt: lastSeenId } },
|
||||
{
|
||||
select: [
|
||||
"id",
|
||||
"title",
|
||||
"status",
|
||||
"subtitle",
|
||||
"description",
|
||||
"handle",
|
||||
"is_giftcard",
|
||||
"discountable",
|
||||
"thumbnail",
|
||||
"profile_id",
|
||||
"collection_id",
|
||||
"type_id",
|
||||
"origin_country",
|
||||
"created_at",
|
||||
"updated_at",
|
||||
],
|
||||
relations: [
|
||||
"variants",
|
||||
"tags",
|
||||
"type",
|
||||
"collection",
|
||||
"variants.prices",
|
||||
"images",
|
||||
"variants.options",
|
||||
"options",
|
||||
],
|
||||
take: TAKE,
|
||||
order: { id: "ASC" },
|
||||
}
|
||||
function loadProductsIntoSearchEngine(container: MedusaContainer): void {
|
||||
const logger: Logger = container.resolve<Logger>("logger")
|
||||
const eventBusService: EventBusService = container.resolve("eventBusService")
|
||||
eventBusService.emit(SEARCH_INDEX_EVENT, {}).catch((err) => {
|
||||
logger.error(err)
|
||||
logger.error(
|
||||
"Something went wrong while emitting the search indexing event."
|
||||
)
|
||||
|
||||
if (products.length > 0) {
|
||||
await searchService.addDocuments(
|
||||
ProductService.IndexName,
|
||||
products,
|
||||
indexTypes.products
|
||||
)
|
||||
lastSeenId = products[products.length - 1].id
|
||||
} else {
|
||||
hasMore = false
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export default async ({
|
||||
|
||||
@@ -5,12 +5,17 @@ import { SearchService } from "medusa-interfaces"
|
||||
* @extends SearchService
|
||||
*/
|
||||
class DefaultSearchService extends SearchService {
|
||||
constructor(container) {
|
||||
constructor(container, options) {
|
||||
super()
|
||||
|
||||
this.isDefault = true
|
||||
|
||||
this.logger_ = container.logger
|
||||
this.options_ = options
|
||||
}
|
||||
|
||||
get options() {
|
||||
return this.options_
|
||||
}
|
||||
|
||||
createIndex(indexName, options) {
|
||||
|
||||
94
packages/medusa/src/subscribers/search-indexing.ts
Normal file
94
packages/medusa/src/subscribers/search-indexing.ts
Normal file
@@ -0,0 +1,94 @@
|
||||
import EventBusService from "../services/event-bus"
|
||||
import { SEARCH_INDEX_EVENT } from "../loaders/search-index"
|
||||
import ProductService from "../services/product"
|
||||
import { indexTypes } from "medusa-core-utils"
|
||||
import { Product } from "../models"
|
||||
import { SearchService } from "../services"
|
||||
|
||||
type InjectedDependencies = {
|
||||
eventBusService: EventBusService
|
||||
searchService: SearchService
|
||||
productService: ProductService
|
||||
}
|
||||
|
||||
class SearchIndexingSubscriber {
|
||||
private readonly eventBusService_: EventBusService
|
||||
private readonly searchService_: SearchService
|
||||
private readonly productService_: ProductService
|
||||
|
||||
constructor({
|
||||
eventBusService,
|
||||
searchService,
|
||||
productService,
|
||||
}: InjectedDependencies) {
|
||||
this.eventBusService_ = eventBusService
|
||||
this.searchService_ = searchService
|
||||
this.productService_ = productService
|
||||
|
||||
this.eventBusService_.subscribe(SEARCH_INDEX_EVENT, this.indexDocuments)
|
||||
}
|
||||
|
||||
indexDocuments = async (): Promise<void> => {
|
||||
const TAKE = this.searchService_?.options?.batch_size ?? 1000
|
||||
let hasMore = true
|
||||
|
||||
let lastSeenId = ""
|
||||
|
||||
while (hasMore) {
|
||||
const products = await this.retrieveNextProducts(lastSeenId, TAKE)
|
||||
|
||||
if (products.length > 0) {
|
||||
await this.searchService_.addDocuments(
|
||||
ProductService.IndexName,
|
||||
products,
|
||||
indexTypes.products
|
||||
)
|
||||
lastSeenId = products[products.length - 1].id
|
||||
} else {
|
||||
hasMore = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected async retrieveNextProducts(
|
||||
lastSeenId: string,
|
||||
take: number
|
||||
): Promise<Product[]> {
|
||||
return await this.productService_.list(
|
||||
{ id: { gt: lastSeenId } },
|
||||
{
|
||||
select: [
|
||||
"id",
|
||||
"title",
|
||||
"status",
|
||||
"subtitle",
|
||||
"description",
|
||||
"handle",
|
||||
"is_giftcard",
|
||||
"discountable",
|
||||
"thumbnail",
|
||||
"profile_id",
|
||||
"collection_id",
|
||||
"type_id",
|
||||
"origin_country",
|
||||
"created_at",
|
||||
"updated_at",
|
||||
],
|
||||
relations: [
|
||||
"variants",
|
||||
"tags",
|
||||
"type",
|
||||
"collection",
|
||||
"variants.prices",
|
||||
"images",
|
||||
"variants.options",
|
||||
"options",
|
||||
],
|
||||
take: take,
|
||||
order: { id: "ASC" },
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
export default SearchIndexingSubscriber
|
||||
Reference in New Issue
Block a user