feat(medusa): Support batch-job API (#1547)

* feat(medusa): Implement confirm batch job end point

* feat(medusa): Implement confirm batch-job end point

* feat(medusa): Add create batch job end point and implementation

* feat(medusa): remove cancelled related

* feat(medusa): Remove unrelated content for batch job creation

* feat(medusa): Cleanup migration

* feat(medusa): Cleanup context

* fix(medusa): Batch-job service import

* feat(medusa): Update migration

* test(medusa): Fix batch job tests

* feat(medusa): Start batch job end point

* feat(medusa): Continue end points

* feat(medusa): Continue end points

* feat(medusa): Finalize batch-job end points

* style(medusa): Lint

* feat(medusa): Cleanup

* feat(medusa): Fix list-batch-job endpoint column selection

* feat(medusa): Batch-job feedback

* feat(medusa): Update create-batch-job endpoint doc

* test(integration-tests): Fix batch-job integration reguarding the response status code

* feat(medusa): Finalize rebase from develop

* feat(medusa): Extend batch job status with ready_at and failed_at

* feat(medusa): Update migration and tests accordingly

* feat(medusa): Update status order on batchJob

* feat(medusa): Enhance batchJobService status update

* style(medusa): Cleanup

* style(medusa): Typo

* style(medusa): Remove unnecessary comment

* cleanup(medusa): Address feedback

* test(integration-tests): Update naming and snapshots

* fix(medusa): Update validator

Co-authored-by: Philip Korsholm <88927411+pKorsholm@users.noreply.github.com>

* Fix(medusa): update validator

Co-authored-by: Philip Korsholm <88927411+pKorsholm@users.noreply.github.com>

* test(intergration-tests): Fix creates batch job

* test(integration-tests): Fix snapshot

* feat(medusa): Re-work status

* tests(integration-tests): Fix batch job

* feat(medusa): Addresses feedback

* fix(medusa): Revert package.json script

* feat(medusa/batch-job-api): Improve status management

* feat(medusa): Improve batch job status and remove some context validation from the service

* feat(medusa): BatchJob api merge params

* feat(medusa): Apply last changes on the batch job service

* Update packages/medusa/src/services/batch-job.ts

Co-authored-by: Oliver Windall Juhl <59018053+olivermrbl@users.noreply.github.com>

* feat(medusa): BatchJobStrategy and loaders (#1434)

* add batch job strategy interface

* update plugin loaders

* remove comment

* make map async

* ensure that only one of each strategy is registered

* register strategies plural

* add identifier and batchType properties

* extend batch job strategy identification method

* initial test

* update loaders to accomodate different ways of accessing batch job strategies

* identifier batch type field

* redo merge in plugins

* update interface and load only js files

* use switches instead of elif

* remove comments

* use static properties for strategy registration

* update tests

* fix unit tests

* update test names

* update isBatchJobStrategy method

* add check for TransactionBaseService in services for plugins

* update interfaces export

* update batchjob strategy interface with a prepare script

* update loaders

* update batchjob strategy interface

* remove everything but public interface methods from batchJobStrategy

* add default implementation to prepareBathJobForProcessing

* remove unused import

* docs: Add Services reference (#1548)

* added events reference

* add upgrade guide for 1.3.0

* Update 1-3-0.md

* merge 1.3.0 with 1.3.1

* rename to 1.3.0

* added paypal documentation

* Improve storefront quickstart documents

* chore(deps): bump sqlite3 from 5.0.2 to 5.0.3 (#1453)

Bumps [sqlite3](https://github.com/TryGhost/node-sqlite3) from 5.0.2 to 5.0.3.
- [Release notes](https://github.com/TryGhost/node-sqlite3/releases)
- [Changelog](https://github.com/TryGhost/node-sqlite3/blob/master/CHANGELOG.md)
- [Commits](https://github.com/TryGhost/node-sqlite3/compare/v5.0.2...v5.0.3)

* fix: Issue with cache in CI pipeline

Co-authored-by: Philip Korsholm <88927411+pKorsholm@users.noreply.github.com>
Co-authored-by: Oliver Windall Juhl <59018053+olivermrbl@users.noreply.github.com>
Co-authored-by: Shahed Nasser <shahednasser@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
Adrien de Peretti
2022-06-16 13:57:18 +02:00
committed by GitHub
parent ee9fb8324c
commit 453688682c
17 changed files with 1073 additions and 83 deletions

View File

@@ -1,6 +1,6 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`/admin/batch GET /admin/batch lists batch jobs created by the user 1`] = `
exports[`/admin/batch-jobs GET /admin/batch-jobs lists batch jobs created by the user 1`] = `
Object {
"batch_jobs": Array [
Object {
@@ -8,6 +8,7 @@ Object {
"created_at": Any<String>,
"created_by": "admin_user",
"deleted_at": null,
"dry_run": false,
"id": "job_3",
"result": null,
"status": "created",
@@ -19,6 +20,7 @@ Object {
"created_at": Any<String>,
"created_by": "admin_user",
"deleted_at": null,
"dry_run": false,
"id": "job_2",
"result": null,
"status": "created",
@@ -30,6 +32,7 @@ Object {
"created_at": Any<String>,
"created_by": "admin_user",
"deleted_at": null,
"dry_run": false,
"id": "job_1",
"result": null,
"status": "created",
@@ -42,3 +45,45 @@ Object {
"offset": 0,
}
`;
exports[`/admin/batch-jobs POST /admin/batch-jobs/ Creates a batch job 1`] = `
Object {
"canceled_at": null,
"completed_at": null,
"confirmed_at": null,
"context": Object {},
"created_at": Any<String>,
"created_by": "admin_user",
"deleted_at": null,
"dry_run": false,
"failed_at": null,
"id": Any<String>,
"pre_processed_at": null,
"processing_at": null,
"result": null,
"status": "created",
"type": "batch_1",
"updated_at": Any<String>,
}
`;
exports[`/admin/batch-jobs POST /admin/batch-jobs/:id/cancel Cancels batch job created by the user 1`] = `
Object {
"canceled_at": Any<String>,
"completed_at": null,
"confirmed_at": null,
"context": Object {},
"created_at": Any<String>,
"created_by": "admin_user",
"deleted_at": null,
"dry_run": false,
"failed_at": null,
"id": "job_1",
"pre_processed_at": null,
"processing_at": null,
"result": null,
"status": "canceled",
"type": "batch_1",
"updated_at": Any<String>,
}
`;

View File

@@ -5,6 +5,8 @@ const { useApi } = require("../../../helpers/use-api")
const { initDb, useDb } = require("../../../helpers/use-db")
const adminSeeder = require("../../helpers/admin-seeder")
const userSeeder = require("../../helpers/user-seeder")
const { simpleBatchJobFactory } = require("../../factories")
jest.setTimeout(50000)
@@ -15,7 +17,41 @@ const adminReqConfig = {
},
}
describe("/admin/batch", () => {
const setupJobDb = async (dbConnection) => {
try {
await adminSeeder(dbConnection)
await userSeeder(dbConnection)
await simpleBatchJobFactory(dbConnection, {
id: "job_1",
type: "batch_1",
created_by: "admin_user",
})
await simpleBatchJobFactory(dbConnection, {
id: "job_2",
type: "batch_2",
ready_at: new Date(),
created_by: "admin_user",
})
await simpleBatchJobFactory(dbConnection, {
id: "job_3",
type: "batch_2",
ready_at: new Date(),
created_by: "admin_user",
})
await simpleBatchJobFactory(dbConnection, {
id: "job_4",
type: "batch_1",
ready_at: new Date(),
created_by: "member-user",
})
} catch (err) {
console.log(err)
throw err
}
}
describe("/admin/batch-jobs", () => {
let medusaProcess
let dbConnection
@@ -32,34 +68,9 @@ describe("/admin/batch", () => {
medusaProcess.kill()
})
describe("GET /admin/batch", () => {
describe("GET /admin/batch-jobs", () => {
beforeEach(async () => {
try {
await simpleBatchJobFactory(dbConnection, {
id: "job_1",
type: "batch_1",
created_by: "admin_user",
})
await simpleBatchJobFactory(dbConnection, {
id: "job_2",
type: "batch_2",
created_by: "admin_user",
})
await simpleBatchJobFactory(dbConnection, {
id: "job_3",
type: "batch_2",
created_by: "admin_user",
})
await simpleBatchJobFactory(dbConnection, {
id: "job_4",
type: "batch_1",
created_by: "not_this_user",
})
await adminSeeder(dbConnection)
} catch (err) {
console.log(err)
throw err
}
await setupJobDb(dbConnection)
})
afterEach(async () => {
@@ -69,7 +80,7 @@ describe("/admin/batch", () => {
it("lists batch jobs created by the user", async () => {
const api = useApi()
const response = await api.get("/admin/batch", adminReqConfig)
const response = await api.get("/admin/batch-jobs", adminReqConfig)
expect(response.status).toEqual(200)
expect(response.data.batch_jobs.length).toEqual(3)
@@ -78,17 +89,196 @@ describe("/admin/batch", () => {
{
created_at: expect.any(String),
updated_at: expect.any(String),
created_by: "admin_user"
},
{
created_at: expect.any(String),
updated_at: expect.any(String),
created_by: "admin_user"
},
{
created_at: expect.any(String),
updated_at: expect.any(String),
created_by: "admin_user"
},
],
})
})
})
describe("GET /admin/batch-jobs/:id", () => {
beforeEach(async () => {
await setupJobDb(dbConnection)
})
afterEach(async () => {
const db = useDb()
await db.teardown()
})
it("return batch job created by the user", async () => {
const api = useApi()
const response = await api.get("/admin/batch-jobs/job_1", adminReqConfig)
expect(response.status).toEqual(200)
expect(response.data.batch_job).toEqual(expect.objectContaining({
created_at: expect.any(String),
updated_at: expect.any(String),
created_by: "admin_user"
}))
})
it("should fail on batch job created by other user", async () => {
const api = useApi()
await api.get("/admin/batch-jobs/job_4", adminReqConfig)
.catch((err) => {
expect(err.response.status).toEqual(400)
expect(err.response.data.type).toEqual("not_allowed")
expect(err.response.data.message).toEqual(
"Cannot access a batch job that does not belong to the logged in user"
)
})
})
})
describe("POST /admin/batch-jobs/", () => {
beforeEach(async() => {
try {
await adminSeeder(dbConnection)
} catch (err) {
console.log(err)
throw err
}
})
afterEach(async() => {
const db = useDb()
await db.teardown()
})
it("Creates a batch job", async() => {
const api = useApi()
const response = await api.post(
"/admin/batch-jobs",
{
type: "batch_1",
context: {},
},
adminReqConfig
)
expect(response.status).toEqual(201)
expect(response.data.batch_job).toMatchSnapshot({
created_by: "admin_user",
status: "created",
id: expect.any(String),
created_at: expect.any(String),
updated_at: expect.any(String),
})
})
})
describe("POST /admin/batch-jobs/:id/confirm", () => {
beforeEach(async () => {
await setupJobDb(dbConnection)
})
afterEach(async () => {
const db = useDb()
await db.teardown()
})
it("Fails to confirm a batch job created by a different user", async () => {
const api = useApi()
const jobId = "job_4"
api
.post(`/admin/batch-jobs/${jobId}/confirm`, {}, adminReqConfig)
.catch((err) => {
expect(err.response.status).toEqual(400)
expect(err.response.data.type).toEqual("not_allowed")
expect(err.response.data.message).toEqual(
"Cannot access a batch job that does not belong to the logged in user"
)
})
})
})
describe("POST /admin/batch-jobs/:id/cancel", () => {
beforeEach(async () => {
try {
await setupJobDb(dbConnection)
await simpleBatchJobFactory(dbConnection, {
id: "job_complete",
type: "batch_1",
created_by: "admin_user",
completed_at: new Date(),
})
} catch (e) {
console.log(e)
throw e
}
})
afterEach(async () => {
const db = useDb()
await db.teardown()
})
it("Cancels batch job created by the user", async () => {
const api = useApi()
const jobId = "job_1"
const response = await api.post(
`/admin/batch-jobs/${jobId}/cancel`,
{},
adminReqConfig
)
expect(response.status).toEqual(200)
expect(response.data.batch_job).toMatchSnapshot({
created_at: expect.any(String),
updated_at: expect.any(String),
canceled_at: expect.any(String),
status: "canceled",
})
})
it("Fails to cancel a batch job created by a different user", async () => {
expect.assertions(3)
const api = useApi()
const jobId = "job_4"
api
.post(`/admin/batch-jobs/${jobId}/cancel`, {}, adminReqConfig)
.catch((err) => {
expect(err.response.status).toEqual(400)
expect(err.response.data.type).toEqual("not_allowed")
expect(err.response.data.message).toEqual(
"Cannot access a batch job that does not belong to the logged in user"
)
})
})
it("Fails to cancel a batch job that is already complete", async () => {
expect.assertions(3)
const api = useApi()
const jobId = "job_complete"
await api
.post(`/admin/batch-jobs/${jobId}/cancel`, {}, adminReqConfig)
.catch((err) => {
expect(err.response.status).toEqual(400)
expect(err.response.data.type).toEqual("not_allowed")
expect(err.response.data.message).toEqual(
"Cannot cancel completed batch job"
)
})
})
})
})

View File

@@ -13,3 +13,5 @@ export * from "./simple-shipping-option-factory"
export * from "./simple-shipping-method-factory"
export * from "./simple-product-type-tax-rate-factory"
export * from "./simple-price-list-factory"
export * from "./simple-batch-job-factory"

View File

@@ -7,6 +7,8 @@ export type BatchJobFactoryData = {
status?: BatchJobStatus
created_by?: string
context?: Record<string, unknown>
awaiting_confirmation_at?: Date | string
completed_at?: Date | string
}
export const simpleBatchJobFactory = async (
@@ -18,6 +20,8 @@ export const simpleBatchJobFactory = async (
const job = manager.create(BatchJob, {
id: data.id,
status: data.status ?? BatchJobStatus.CREATED,
awaiting_confirmation_at: data.awaiting_confirmation_at ?? null,
completed_at: data.completed_at ?? null,
type: data.type ?? "test-job",
created_by: data.created_by ?? null,
context: data.context ?? {},

View File

@@ -5,26 +5,30 @@ const importFrom = require("import-from")
module.exports = {
bootstrapApp: async ({ cwd } = {}) => {
const app = express()
try {
const app = express()
const loaders = importFrom(
cwd || process.cwd(),
"@medusajs/medusa/dist/loaders"
).default
const loaders = importFrom(
cwd || process.cwd(),
"@medusajs/medusa/dist/loaders"
).default
const { container, dbConnection } = await loaders({
directory: path.resolve(cwd || process.cwd()),
expressApp: app,
isTest: false,
})
const { container, dbConnection } = await loaders({
directory: path.resolve(cwd || process.cwd()),
expressApp: app,
isTest: false,
})
const PORT = await getPort()
const PORT = await getPort()
return {
container,
db: dbConnection,
app,
port: PORT,
return {
container,
db: dbConnection,
app,
port: PORT,
}
} catch (e) {
console.log(e)
}
},
}

View File

@@ -0,0 +1,30 @@
import { BatchJobService } from "../../../../services"
/**
* @oas [post] /batch-jobs/{id}/cancel
* operationId: "PostBatchJobsBatchJobCancel"
* summary: "Marks a batch job as canceled"
* description: "Marks a batch job as canceled"
* x-authenticated: true
* parameters:
* - (path) id=* {string} The id of the batch job.
* tags:
* - Batch Job
* responses:
* "200":
* description: OK
* content:
* application/json:
* schema:
* properties:
* batch_job:
* $ref: "#/components/schemas/batch_job"
*/
export default async (req, res) => {
let batch_job = req.batch_job
const batchJobService: BatchJobService = req.scope.resolve("batchJobService")
batch_job = await batchJobService.cancel(batch_job)
res.json({ batch_job })
}

View File

@@ -0,0 +1,31 @@
import { BatchJobService } from "../../../../services"
/**
* @oas [post] /batch-jobs/{id}/confirm
* operationId: "PostBatchJobsBatchJobConfirmProcessing"
* summary: "Confirm a batch job"
* description: "Confirms that a previously requested batch job should be executed."
* x-authenticated: true
* parameters:
* - (path) id=* {string} The id of the batch job.
* tags:
* - Batch Job
* responses:
* "200":
* description: OK
* content:
* application/json:
* schema:
* properties:x
* batch_job:
* $ref: "#/components/schemas/batch_job"
*/
export default async (req, res) => {
let batch_job = req.batch_job
const batchJobService: BatchJobService = req.scope.resolve("batchJobService")
batch_job = await batchJobService.confirm(batch_job)
res.json({ batch_job })
}

View File

@@ -0,0 +1,51 @@
import { IsBoolean, IsObject, IsOptional, IsString } from "class-validator"
import BatchJobService from "../../../../services/batch-job"
import { validator } from "../../../../utils/validator"
/**
* @oas [post] /batch-jobs
* operationId: "PostBatchJobs"
* summary: "Create a Batch Job"
* description: "Creates a Batch Job."
* x-authenticated: true
* parameters:
* - (body) type=* {string} The type of batch job to start.
* - (body) context=* {string} Additional infomration regarding the batch to be used for processing.
* - (body) dry_run=* {boolean} Set a batch job in dry_run mode to get some information on what will be done without applying any modifications.
* tags:
* - Customer
* responses:
* 201:
* description: OK
* content:
* application/json:
* schema:
* properties:
* batch_job:
* $ref: "#/components/schemas/batch_job"
*/
export default async (req, res) => {
const validated = await validator(AdminPostBatchesReq, req.body)
const userId = req.user.id ?? req.user.userId
const batchJobService: BatchJobService = req.scope.resolve("batchJobService")
const batch_job = await batchJobService.create({
...validated,
created_by: userId,
})
res.status(201).json({ batch_job })
}
export class AdminPostBatchesReq {
@IsString()
type: string
@IsObject()
context: Record<string, unknown>
@IsBoolean()
@IsOptional()
dry_run = false
}

View File

@@ -0,0 +1,24 @@
/**
* @oas [get] /batch-jobs/{id}
* operationId: "GetBatchJobsBatchJob"
* summary: "Retrieve a Batch Job"
* description: "Retrieves a Batch Job."
* x-authenticated: true
* parameters:
* - (path) id=* {string} The id of the Batch Job
* tags:
* - Batch Job
* responses:
* "200":
* description: OK
* content:
* application/json:
* schema:
* properties:
* batch_job:
* $ref: "#/components/schemas/batch_job"
*/
export default async (req, res) => {
const batch_job = req.batch_job
res.status(200).json({ batch_job: batch_job })
}

View File

@@ -1,13 +1,17 @@
import { Router } from "express"
import { BatchJob } from "../../../.."
import { DeleteResponse, PaginatedResponse } from "../../../../types/common"
import middlewares, { transformQuery } from "../../../middlewares"
import { AdminGetBatchParams } from "./list-batch-jobs"
import middlewares, {
transformQuery,
getRequestedBatchJob,
canAccessBatchJob,
} from "../../../middlewares"
export default (app) => {
const route = Router()
app.use("/batch", route)
app.use("/batch-jobs", route)
route.get(
"/",
@@ -17,6 +21,20 @@ export default (app) => {
}),
middlewares.wrap(require("./list-batch-jobs").default)
)
route.post("/", middlewares.wrap(require("./create-batch-job").default))
const batchJobRouter = Router({ mergeParams: true })
route.use("/:id", getRequestedBatchJob, canAccessBatchJob, batchJobRouter)
batchJobRouter.get("/", middlewares.wrap(require("./get-batch-job").default))
batchJobRouter.post(
"/confirm",
middlewares.wrap(require("./confirm-batch-job").default)
)
batchJobRouter.post(
"/cancel",
middlewares.wrap(require("./cancel-batch-job").default)
)
return app
}
@@ -32,7 +50,6 @@ export type AdminBatchJobListRes = PaginatedResponse & {
export const defaultAdminBatchFields = [
"id",
"status",
"type",
"context",
"result",

View File

@@ -1,7 +1,6 @@
import { Type } from "class-transformer"
import {
IsArray,
IsEnum,
IsNumber,
IsOptional,
IsString,
@@ -9,14 +8,13 @@ import {
} from "class-validator"
import { pickBy } from "lodash"
import BatchJobService from "../../../../services/batch-job"
import { BatchJobStatus } from "../../../../types/batch-job"
import { DateComparisonOperator } from "../../../../types/common"
import { IsType } from "../../../../utils/validators/is-type"
import { Request } from "express"
/**
* @oas [get] /batch
* operationId: "GetBatch"
* @oas [get] /batch-jobs
* operationId: "GetBatchJobs"
* summary: "List Batch Jobs"
* description: "Retrieve a list of Batch Jobs."
* x-authenticated: true
@@ -24,7 +22,11 @@ import { Request } from "express"
* - (query) limit {string} The number of collections to return.
* - (query) offset {string} The offset of collections to return.
* - (query) type {string | string[]} Filter by the batch type
* - (query) status {string} Filter by the status of the batch operation
* - (query) confirmed_at {DateComparisonOperator} Date comparison for when resulting collections was confirmed, i.e. less than, greater than etc.
* - (query) pre_processed_at {DateComparisonOperator} Date comparison for when resulting collections was pre processed, i.e. less than, greater than etc.
* - (query) completed_at {DateComparisonOperator} Date comparison for when resulting collections was completed, i.e. less than, greater than etc.
* - (query) failed_at {DateComparisonOperator} Date comparison for when resulting collections was failed, i.e. less than, greater than etc.
* - (query) canceled_at {DateComparisonOperator} Date comparison for when resulting collections was canceled, i.e. less than, greater than etc.
* - (query) order {string} Order used when retrieving batch jobs
* - (query) expand[] {string} (Comma separated) Which fields should be expanded in each order of the result.
* - (query) fields[] {string} (Comma separated) Which fields should be included in each order of the result.
@@ -95,18 +97,32 @@ export class AdminGetBatchParams extends AdminGetBatchPaginationParams {
@IsType([String, [String]])
id?: string | string[]
@IsOptional()
@IsArray()
@IsEnum(BatchJobStatus, { each: true })
status?: BatchJobStatus[]
@IsArray()
@IsOptional()
type?: string[]
@IsOptional()
@ValidateNested()
@Type(() => DateComparisonOperator)
confirmed_at?: DateComparisonOperator
@IsOptional()
@Type(() => DateComparisonOperator)
pre_processed_at?: DateComparisonOperator
@IsOptional()
@Type(() => DateComparisonOperator)
completed_at?: DateComparisonOperator
@IsOptional()
@Type(() => DateComparisonOperator)
failed_at?: DateComparisonOperator
@IsOptional()
@Type(() => DateComparisonOperator)
canceled_at?: DateComparisonOperator
@IsType([DateComparisonOperator])
@IsOptional()
created_at?: DateComparisonOperator
@IsOptional()

View File

@@ -5,15 +5,38 @@ export class addBatchJobModel1649775522087 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE TYPE "batch_job_status_enum" AS ENUM('created', 'processing', 'awaiting_confirmation', 'completed')`
`CREATE TABLE "batch_job"
(
"id" character varying NOT NULL,
"type" text NOT NULL,
"created_by" character varying,
"context" jsonb,
"result" jsonb,
"dry_run" boolean NOT NULL DEFAULT FALSE,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"pre_processed_at" TIMESTAMP WITH TIME ZONE,
"confirmed_at" TIMESTAMP WITH TIME ZONE,
"processing_at" TIMESTAMP WITH TIME ZONE,
"completed_at" TIMESTAMP WITH TIME ZONE,
"failed_at" TIMESTAMP WITH TIME ZONE,
"canceled_at" TIMESTAMP WITH TIME ZONE,
"updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"deleted_at" TIMESTAMP WITH TIME ZONE,
CONSTRAINT "PK_e57f84d485145d5be96bc6d871e" PRIMARY KEY ("id")
)`
)
await queryRunner.query(
`CREATE TABLE "batch_job" ("id" character varying NOT NULL, "type" text NOT NULL, "status" "public"."batch_job_status_enum" NOT NULL, "created_by" text, "context" jsonb, "result" jsonb, "created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "deleted_at" TIMESTAMP WITH TIME ZONE, CONSTRAINT "PK_e57f84d485145d5be96bc6d871e" PRIMARY KEY ("id"))`
`ALTER TABLE "batch_job"
ADD CONSTRAINT "FK_fa53ca4f5fd90605b532802a626" FOREIGN KEY ("created_by") REFERENCES "user" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION`
)
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "batch_job"
DROP CONSTRAINT "FK_fa53ca4f5fd90605b532802a626"`
)
await queryRunner.query(`DROP TABLE "batch_job"`)
await queryRunner.query(`DROP TYPE "batch_job_status_enum"`)
}
}
}

View File

@@ -1,30 +1,85 @@
import { BeforeInsert, Column, Entity, PrimaryColumn } from "typeorm"
import { AfterLoad, BeforeInsert, Column, Entity, JoinColumn, ManyToOne } from "typeorm"
import { BatchJobStatus } from "../types/batch-job"
import { DbAwareColumn } from "../utils/db-aware-column"
import { DbAwareColumn, resolveDbType } from "../utils/db-aware-column"
import { SoftDeletableEntity } from "../interfaces/models/soft-deletable-entity"
import { generateEntityId } from "../utils/generate-entity-id"
import { User } from "./user"
@Entity()
export class BatchJob extends SoftDeletableEntity {
@DbAwareColumn({ type: "text" })
type: string
@DbAwareColumn({ type: "enum", enum: BatchJobStatus })
status: BatchJobStatus
@Column({ type: "text", nullable: true })
@Column({ nullable: true })
created_by: string | null
@ManyToOne(() => User)
@JoinColumn({ name: "created_by" })
created_by_user: User
@DbAwareColumn({ type: "jsonb", nullable: true })
context: Record<string, unknown>
context: { retry_count?: number; max_retry?: number } & Record<string, unknown>
@DbAwareColumn({ type: "jsonb", nullable: true })
result: Record<string, unknown>
@Column({ type: "boolean", nullable: false, default: false })
dry_run: boolean = false;
@Column({ type: resolveDbType("timestamptz"), nullable: true })
pre_processed_at?: Date
@Column({ type: resolveDbType("timestamptz"), nullable: true })
processing_at?: Date
@Column({ type: resolveDbType("timestamptz"), nullable: true })
confirmed_at?: Date
@Column({ type: resolveDbType("timestamptz"), nullable: true })
completed_at?: Date
@Column({ type: resolveDbType("timestamptz"), nullable: true })
canceled_at?: Date
@Column({ type: resolveDbType("timestamptz"), nullable: true })
failed_at?: Date
status: BatchJobStatus
@AfterLoad()
loadStatus(): void {
/* Always keep the status order consistent. */
if (this.pre_processed_at) {
this.status = BatchJobStatus.PRE_PROCESSED
}
if (this.confirmed_at) {
this.status = BatchJobStatus.CONFIRMED
}
if (this.processing_at) {
this.status = BatchJobStatus.PROCESSING
}
if (this.completed_at) {
this.status = BatchJobStatus.COMPLETED
}
if (this.canceled_at) {
this.status = BatchJobStatus.CANCELED
}
if (this.failed_at) {
this.status = BatchJobStatus.FAILED
}
this.status = this.status ?? BatchJobStatus.CREATED
}
@BeforeInsert()
private beforeInsert(): void {
this.id = generateEntityId(this.id, "batch")
}
toJSON() {
this.loadStatus()
return this
}
}
/**
@@ -47,18 +102,40 @@ export class BatchJob extends SoftDeletableEntity {
* type: string
* enum:
* - created
* - pre_processed
* - processing
* - awaiting_confirmation
* - completed
* - canceled
* - failed
* created_by:
* description: "The unique identifier of the user that created the batch job."
* type: string
* context:
* description: "The context of the batch job, the type of the batch job determines what the context should contain."
* type: object
* dry_run:
* description: "Specify if the job must apply the modifications or not."
* type: boolean
* default: false
* result:
* description: "The result of the batch job."
* type: object
* pre_processed_at:
* description: "The date from which the job has been pre processed."
* type: string
* format: date-time
* confirmed_at:
* description: "The date when the confirmation has been done."
* type: string
* format: date-time
* completed_at:
* description: "The date of the completion."
* type: string
* format: date-time
* canceled_at:
* description: "The date of the concellation."
* type: string
* format: date-time
* created_at:
* description: "The date with timezone at which the resource was created."
* type: string

View File

@@ -0,0 +1,199 @@
import { IdMap, MockManager, MockRepository } from "medusa-test-utils"
import BatchJobService from "../batch-job"
import { EventBusService } from "../index"
import { BatchJobStatus } from "../../types/batch-job"
import { BatchJob } from "../../models"
const eventBusServiceMock = {
emit: jest.fn(),
withTransaction: function() {
return this
},
} as unknown as EventBusService
const batchJobRepositoryMock = MockRepository({
create: jest.fn().mockImplementation((data) => {
return Object.assign(new BatchJob(), data)
})
})
describe('BatchJobService', () => {
const batchJobId_1 = IdMap.getId("batchJob_1")
const batchJobService = new BatchJobService({
manager: MockManager,
eventBusService: eventBusServiceMock,
batchJobRepository: batchJobRepositoryMock
})
afterEach(() => {
jest.clearAllMocks()
})
describe('update status', () => {
describe("confirm", () => {
it('should be able to confirm_processing a batch job to emit the processing event', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: true,
status: BatchJobStatus.PRE_PROCESSED
})
const updatedBatchJob = await batchJobService.confirm(batchJob)
expect(updatedBatchJob.processing_at).not.toBeTruthy()
expect(eventBusServiceMock.emit)
.toHaveBeenCalledWith(BatchJobService.Events.CONFIRMED, { id: batchJobId_1 })
})
it('should not be able to confirm a batch job with the wrong status', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: true,
status: BatchJobStatus.CREATED
})
const err = await batchJobService.confirm(batchJob)
.catch(e => e)
expect(err).toBeTruthy()
expect(err.message).toBe("Cannot confirm processing for a batch job that is not pre processed")
expect(eventBusServiceMock.emit).toHaveBeenCalledTimes(0)
})
})
describe("complete", () => {
it('should be able to complete a batch job', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: true,
status: BatchJobStatus.PROCESSING
})
const updatedBatchJob = await batchJobService.complete(batchJob)
expect(updatedBatchJob.completed_at).toBeTruthy()
expect(eventBusServiceMock.emit)
.toHaveBeenCalledWith(BatchJobService.Events.COMPLETED, { id: batchJobId_1 })
const batchJob2 = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: false,
status: BatchJobStatus.PROCESSING
})
const updatedBatchJob2 = await batchJobService.complete(batchJob2)
expect(updatedBatchJob2.completed_at).toBeTruthy()
expect(eventBusServiceMock.emit)
.toHaveBeenCalledWith(BatchJobService.Events.COMPLETED, { id: batchJobId_1 })
})
it('should not be able to complete a batch job with the wrong status', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: true,
status: BatchJobStatus.CREATED
})
const err = await batchJobService.complete(batchJob)
.catch(e => e)
expect(err).toBeTruthy()
expect(err.message).toBe( `Cannot complete a batch job with status "${batchJob.status}". The batch job must be processing`)
expect(eventBusServiceMock.emit).toHaveBeenCalledTimes(0)
const batchJob2 = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: false,
status: BatchJobStatus.PRE_PROCESSED
})
const err2 = await batchJobService.complete(batchJob2)
.catch(e => e)
expect(err2).toBeTruthy()
expect(err2.message).toBe( `Cannot complete a batch job with status "${batchJob2.status}". The batch job must be processing`)
expect(eventBusServiceMock.emit).toHaveBeenCalledTimes(0)
})
})
describe("pre processed", () => {
it('should be able to mark as pre processed a batch job in dry_run', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: true,
status: BatchJobStatus.CREATED
})
const updatedBatchJob = await batchJobService.setPreProcessingDone(batchJob)
expect(updatedBatchJob.pre_processed_at).toBeTruthy()
expect(eventBusServiceMock.emit)
.toHaveBeenCalledWith(BatchJobService.Events.PRE_PROCESSED, { id: batchJobId_1 })
})
it('should be able to mark as completed a batch job that has been pre processed but not in dry_run', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
dry_run: false,
status: BatchJobStatus.CREATED
})
const updatedBatchJob = await batchJobService.setPreProcessingDone(batchJob)
expect(updatedBatchJob.pre_processed_at).toBeTruthy()
expect(updatedBatchJob.confirmed_at).toBeTruthy()
expect(eventBusServiceMock.emit).toHaveBeenCalledTimes(2)
expect(eventBusServiceMock.emit)
.toHaveBeenCalledWith(BatchJobService.Events.PRE_PROCESSED, { id: batchJobId_1 })
expect(eventBusServiceMock.emit)
.toHaveBeenLastCalledWith(BatchJobService.Events.CONFIRMED, { id: batchJobId_1 })
})
})
describe("cancel", () => {
it('should be able to cancel a batch job', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
status: BatchJobStatus.CREATED
})
const updatedBatchJob = await batchJobService.cancel(batchJob)
expect(updatedBatchJob.canceled_at).toBeTruthy()
expect(eventBusServiceMock.emit)
.toHaveBeenCalledWith(BatchJobService.Events.CANCELED, { id: batchJobId_1 })
})
it('should not be able to cancel a batch job with the wrong status', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
status: BatchJobStatus.COMPLETED
})
const err = await batchJobService.cancel(batchJob)
.catch(e => e)
expect(err).toBeTruthy()
expect(err.message).toBe("Cannot cancel completed batch job")
expect(eventBusServiceMock.emit).toHaveBeenCalledTimes(0)
})
})
describe("processing", () => {
it('should be able to mark as processing a batch job', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
status: BatchJobStatus.CONFIRMED
})
const updatedBatchJob = await batchJobService.setProcessing(batchJob)
expect(updatedBatchJob.processing_at).toBeTruthy()
expect(eventBusServiceMock.emit)
.toHaveBeenCalledWith(BatchJobService.Events.PROCESSING, { id: batchJobId_1 })
})
it('should not be able to mark as processing a batch job with the wrong status', async () => {
const batchJob = batchJobRepositoryMock.create({
id: batchJobId_1,
status: BatchJobStatus.COMPLETED
})
const err = await batchJobService.setProcessing(batchJob)
.catch(e => e)
expect(err).toBeTruthy()
expect(err.message).toBe("Cannot mark a batch job as processing if the status is different that confirmed")
expect(eventBusServiceMock.emit).toHaveBeenCalledTimes(0)
})
})
})
})

View File

@@ -1,34 +1,99 @@
import { EntityManager } from "typeorm"
import { DeepPartial, EntityManager } from "typeorm"
import { BatchJob } from "../models"
import { BatchJobRepository } from "../repositories/batch-job"
import { FilterableBatchJobProps } from "../types/batch-job"
import {
BatchJobCreateProps,
BatchJobStatus,
BatchJobUpdateProps,
FilterableBatchJobProps,
} from "../types/batch-job"
import { FindConfig } from "../types/common"
import { TransactionBaseService } from "../interfaces"
import { buildQuery, validateId } from "../utils"
import { buildQuery } from "../utils"
import { MedusaError } from "medusa-core-utils"
import { EventBusService } from "./index"
type InjectedDependencies = {
manager: EntityManager
eventBusService: EventBusService
batchJobRepository: typeof BatchJobRepository
}
class BatchJobService extends TransactionBaseService<BatchJobService> {
protected manager_: EntityManager
protected transactionManager_: EntityManager | undefined
protected readonly batchJobRepository_: typeof BatchJobRepository
static readonly Events = {
CREATED: "batch.created",
UPDATED: "batch.updated",
PRE_PROCESSED: "batch.pre_processed",
CONFIRMED: "batch.confirmed",
PROCESSING: "batch.processing",
COMPLETED: "batch.completed",
CANCELED: "batch.canceled",
FAILED: "batch.failed",
}
constructor({ manager, batchJobRepository }: InjectedDependencies) {
super({ manager, batchJobRepository })
protected readonly manager_: EntityManager
protected transactionManager_: EntityManager | undefined
protected readonly batchJobRepository_: typeof BatchJobRepository
protected readonly eventBus_: EventBusService
protected batchJobStatusMapToProps = new Map<
BatchJobStatus,
{ entityColumnName: string; eventType: string }
>([
[
BatchJobStatus.PRE_PROCESSED,
{
entityColumnName: "pre_processed_at",
eventType: BatchJobService.Events.PRE_PROCESSED,
},
],
[
BatchJobStatus.CONFIRMED,
{
entityColumnName: "confirmed_at",
eventType: BatchJobService.Events.CONFIRMED,
},
],
[
BatchJobStatus.PROCESSING,
{
entityColumnName: "processing_at",
eventType: BatchJobService.Events.PROCESSING,
},
],
[
BatchJobStatus.COMPLETED,
{
entityColumnName: "completed_at",
eventType: BatchJobService.Events.COMPLETED,
},
],
[
BatchJobStatus.CANCELED,
{
entityColumnName: "canceled_at",
eventType: BatchJobService.Events.CANCELED,
},
],
[
BatchJobStatus.FAILED,
{
entityColumnName: "failed_at",
eventType: BatchJobService.Events.FAILED,
},
],
])
constructor({
manager,
batchJobRepository,
eventBusService,
}: InjectedDependencies) {
super({ manager, batchJobRepository, eventBusService })
this.manager_ = manager
this.batchJobRepository_ = batchJobRepository
this.eventBus_ = eventBusService
}
async retrieve(
@@ -71,6 +136,206 @@ class BatchJobService extends TransactionBaseService<BatchJobService> {
}
)
}
async create(data: BatchJobCreateProps): Promise<BatchJob> {
return await this.atomicPhase_(async (manager) => {
const batchJobRepo: BatchJobRepository = manager.getCustomRepository(
this.batchJobRepository_
)
const batchJob = batchJobRepo.create(data)
const result = await batchJobRepo.save(batchJob)
await this.eventBus_
.withTransaction(manager)
.emit(BatchJobService.Events.CREATED, {
id: result.id,
})
return result
})
}
async update(
batchJobId: string,
data: BatchJobUpdateProps
): Promise<BatchJob> {
return await this.atomicPhase_(async (manager) => {
const batchJobRepo: BatchJobRepository = manager.getCustomRepository(
this.batchJobRepository_
)
let batchJob = await this.retrieve(batchJobId)
const { context, ...rest } = data
if (context) {
batchJob.context = { ...batchJob.context, ...context }
}
Object.keys(rest)
.filter((key) => typeof rest[key] !== `undefined`)
.forEach((key) => {
batchJob[key] = rest[key]
})
batchJob = await batchJobRepo.save(batchJob)
await this.eventBus_
.withTransaction(manager)
.emit(BatchJobService.Events.UPDATED, {
id: batchJob.id,
})
return batchJob
})
}
protected async updateStatus(
batchJobOrId: BatchJob | string,
status: BatchJobStatus
): Promise<BatchJob | never> {
const transactionManager = this.transactionManager_ ?? this.manager_
let batchJob: BatchJob = batchJobOrId as BatchJob
if (typeof batchJobOrId === "string") {
batchJob = await this.retrieve(batchJobOrId)
}
const { entityColumnName, eventType } =
this.batchJobStatusMapToProps.get(status) || {}
if (!entityColumnName || !eventType) {
throw new MedusaError(
MedusaError.Types.INVALID_DATA,
`Unable to update the batch job status from ${batchJob.status} to ${status}. The status doesn't exist`
)
}
batchJob[entityColumnName] = new Date()
const batchJobRepo = transactionManager.getCustomRepository(
this.batchJobRepository_
)
batchJob = await batchJobRepo.save(batchJob)
batchJob.loadStatus()
this.eventBus_.withTransaction(transactionManager).emit(eventType, {
id: batchJob.id,
})
return batchJob
}
async confirm(batchJobOrId: string | BatchJob): Promise<BatchJob | never> {
return await this.atomicPhase_(async () => {
let batchJob: BatchJob = batchJobOrId as BatchJob
if (typeof batchJobOrId === "string") {
batchJob = await this.retrieve(batchJobOrId)
}
if (batchJob.status !== BatchJobStatus.PRE_PROCESSED) {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
"Cannot confirm processing for a batch job that is not pre processed"
)
}
return await this.updateStatus(batchJob, BatchJobStatus.CONFIRMED)
})
}
async complete(batchJobOrId: string | BatchJob): Promise<BatchJob | never> {
return await this.atomicPhase_(async () => {
let batchJob: BatchJob = batchJobOrId as BatchJob
if (typeof batchJobOrId === "string") {
batchJob = await this.retrieve(batchJobOrId)
}
if (batchJob.status !== BatchJobStatus.PROCESSING) {
throw new MedusaError(
MedusaError.Types.INVALID_DATA,
`Cannot complete a batch job with status "${batchJob.status}". The batch job must be processing`
)
}
return await this.updateStatus(batchJob, BatchJobStatus.COMPLETED)
})
}
async cancel(batchJobOrId: string | BatchJob): Promise<BatchJob | never> {
return await this.atomicPhase_(async () => {
let batchJob: BatchJob = batchJobOrId as BatchJob
if (typeof batchJobOrId === "string") {
batchJob = await this.retrieve(batchJobOrId)
}
if (batchJob.status === BatchJobStatus.COMPLETED) {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
"Cannot cancel completed batch job"
)
}
return await this.updateStatus(batchJob, BatchJobStatus.CANCELED)
})
}
async setPreProcessingDone(
batchJobOrId: string | BatchJob
): Promise<BatchJob | never> {
return await this.atomicPhase_(async () => {
let batchJob: BatchJob = batchJobOrId as BatchJob
if (typeof batchJobOrId === "string") {
batchJob = await this.retrieve(batchJobOrId)
}
if (batchJob.status === BatchJobStatus.PRE_PROCESSED) {
return batchJob
}
if (batchJob.status !== BatchJobStatus.CREATED) {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
"Cannot mark a batch job as pre processed if it is not in created status"
)
}
batchJob = await this.updateStatus(
batchJobOrId,
BatchJobStatus.PRE_PROCESSED
)
if (batchJob.dry_run) {
return batchJob
}
return await this.confirm(batchJob)
})
}
async setProcessing(
batchJobOrId: string | BatchJob
): Promise<BatchJob | never> {
return await this.atomicPhase_(async () => {
let batchJob: BatchJob = batchJobOrId as BatchJob
if (typeof batchJobOrId === "string") {
batchJob = await this.retrieve(batchJobOrId)
}
if (batchJob.status !== BatchJobStatus.CONFIRMED) {
throw new MedusaError(
MedusaError.Types.NOT_ALLOWED,
"Cannot mark a batch job as processing if the status is different that confirmed"
)
}
return await this.updateStatus(batchJob, BatchJobStatus.PROCESSING)
})
}
async setFailed(batchJobOrId: string | BatchJob): Promise<BatchJob | never> {
return await this.atomicPhase_(async () => {
return await this.updateStatus(batchJobOrId, BatchJobStatus.FAILED)
})
}
}
export default BatchJobService

View File

@@ -40,3 +40,4 @@ export { default as TaxRateService } from "./tax-rate"
export { default as TaxProviderService } from "./tax-provider"
export { default as ProductTypeService } from "./product-type"
export { default as PricingService } from "./pricing"
export { default as BatchJobService } from "./batch-job"

View File

@@ -8,14 +8,20 @@ import {
} from "class-validator"
import { IsType } from "../utils/validators/is-type"
import { DateComparisonOperator } from "./common"
import { BatchJob } from "../models"
export enum BatchJobStatus {
CREATED = "created",
PRE_PROCESSED = "pre_processed",
CONFIRMED = "confirmed",
PROCESSING = "processing",
AWAITING_CONFIRMATION = "awaiting_confirmation",
COMPLETED = "completed",
CANCELED = "canceled",
FAILED = "failed",
}
export type BatchJobUpdateProps = Partial<Pick<BatchJob, "context" | "result">>
export class FilterableBatchJobProps {
@IsOptional()
@IsType([String, [String]])
@@ -44,3 +50,8 @@ export class FilterableBatchJobProps {
@Type(() => DateComparisonOperator)
updated_at?: DateComparisonOperator
}
export type BatchJobCreateProps = Pick<
BatchJob,
"context" | "type" | "created_by" | "dry_run"
>