Commit 87efadd6 Harvey

koa支持koa

1 个父辈 08d998f1
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const path = require("path");
const config = {
appType: 'express',
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq",
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml')
};
var jobExecutor = new index_1.JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
});
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
]
];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=express-test.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const path = require("path");
const config = {
appType: 'express',
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq",
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml')
};
var jobExecutor = new index_1.JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
});
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
]
];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=express.js.map
\ No newline at end of file \ No newline at end of file
...@@ -7,12 +7,12 @@ class JobExecutor { ...@@ -7,12 +7,12 @@ class JobExecutor {
if (opts.frpcConfigPath) { if (opts.frpcConfigPath) {
require("./xxl-job/frpc").createFrpc(opts.frpcConfigPath); require("./xxl-job/frpc").createFrpc(opts.frpcConfigPath);
} }
new (require("./xxl-job/index").XxlJobExecutor)(opts.xxlJob, this.jobHandlers).applyMiddleware({ const app = new (require("./xxl-job/index").XxlJobExecutor)(opts.xxlJob, this.jobHandlers).applyMiddleware({
app: opts.app, appType: opts.appType,
domain: opts.appDomain domain: opts.appDomain
}); });
opts.app.listen(opts.port, () => { app.listen(opts.port, () => {
console.log(`[XXL-JOB] Executor "${opts.xxlJob.executorKey}" is running`); console.log(`[XXL-JOB] Executor ${opts.appType} "${opts.xxlJob.executorKey}" is running`);
console.log(`- Local: http://localhost:${opts.port}`); console.log(`- Local: http://localhost:${opts.port}`);
console.log(`- Public: ${opts.appDomain}`); console.log(`- Public: ${opts.appDomain}`);
}); });
......
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const path = require("path");
const config = {
appType: "koa",
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq",
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml')
};
var jobExecutor = new index_1.JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
});
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
]
];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=koa.js.map
\ No newline at end of file \ No newline at end of file
"use strict"; "use strict";
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
const express = require("express");
const index_1 = require("./index"); const index_1 = require("./index");
const path = require("path"); const path = require("path");
const config = { const config = {
app: express().use(require('body-parser').json()), appType: "express",
appDomain: "http://10.0.1.92:17001/", appDomain: "http://10.0.1.92:17001/",
port: 8088, port: 8088,
xxlJob: { xxlJob: {
......
...@@ -17,10 +17,22 @@ class Executor { ...@@ -17,10 +17,22 @@ class Executor {
this.jobHandlers = jobHandlers; this.jobHandlers = jobHandlers;
this.jobManager = new job_manager_1.JobManager(jobLogPath); this.jobManager = new job_manager_1.JobManager(jobLogPath);
} }
applyMiddleware({ app, domain }) { applyMiddleware({ appType, domain }) {
this.executorUrl = domain; this.executorUrl = domain;
if (appType === 'express') {
return this.applyExpressMiddleware();
}
else if (appType === 'koa') {
return this.applyKoaMiddleware();
}
else {
throw new Error(`Unsupported appType: ${appType}`);
}
}
applyExpressMiddleware() {
const express = require('express'); const express = require('express');
var router = new express.Router(); const app = express().use(require('body-parser').json());
const router = express.Router();
router.use(async (req, res, next) => { router.use(async (req, res, next) => {
res.status(200); res.status(200);
const token = req.headers && req.headers['xxl-job-access-token']; const token = req.headers && req.headers['xxl-job-access-token'];
...@@ -34,26 +46,57 @@ class Executor { ...@@ -34,26 +46,57 @@ class Executor {
} }
next(); next();
}); });
this.addJobRoutes(router); this.addJobRoutes(router, 'express');
app.use(router); app.use(router);
return app;
}
applyKoaMiddleware() {
const Koa = require('koa');
const app = new Koa();
app.use(require('koa-bodyparser')());
const Router = require('koa-router');
const router = new Router();
const bodyParser = require('koa-bodyparser');
router.use(async (ctx, next) => {
ctx.status = 200;
const token = ctx.headers && ctx.headers['xxl-job-access-token'];
if (!!this.accessToken && this.accessToken !== token) {
ctx.body = { code: 500, msg: 'access token incorrect' };
return;
} }
addJobRoutes(router) { if (!ctx.request.body) {
router.post(`/beat`, async (req, res, next) => { ctx.body = { code: 500, msg: 'body is null' };
res.send(this.beat()); return;
}); }
router.post(`/idleBeat`, async (req, res, next) => { await next();
res.send(this.idleBeat(req.body.jobId || -1));
}); });
router.post(`/run`, async (req, res, next) => { this.addJobRoutes(router, 'koa');
res.send(this.run(req.body || {})); app.use(bodyParser());
app.use(router.routes()).use(router.allowedMethods());
return app;
}
addJobRoutes(router, appType) {
const handleRoute = (path, handler) => {
if (appType === 'express') {
router.post(path, async (req, res) => {
const result = handler(req.body || {});
res.send(result);
}); });
router.post(`/kill`, async (req, res, next) => { }
res.send(this.killJob(req.body.jobId || -1)); else if (appType === 'koa') {
router.post(path, async (ctx) => {
const result = handler(ctx.request.body || {});
ctx.body = result;
}); });
router.post(`/log`, async (req, res, next) => { }
const { logDateTim, logId, fromLineNum } = req.body || {}; };
const data = await this.readLog(logDateTim, logId, fromLineNum); handleRoute('/beat', () => this.beat());
res.send(data); handleRoute('/idleBeat', (body) => this.idleBeat(body.jobId || -1));
handleRoute('/run', (body) => this.run(body || {}));
handleRoute('/kill', (body) => this.killJob(body.jobId || -1));
handleRoute('/log', async (body) => {
const { logDateTim, logId, fromLineNum } = body || {};
return await this.readLog(logDateTim, logId, fromLineNum);
}); });
} }
beat() { beat() {
......
...@@ -9,10 +9,11 @@ class XxlJobExecutor { ...@@ -9,10 +9,11 @@ class XxlJobExecutor {
const { executorKey, scheduleCenterUrl, accessToken, jobLogPath, } = opts; const { executorKey, scheduleCenterUrl, accessToken, jobLogPath, } = opts;
this.executor = new executor_1.Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers); this.executor = new executor_1.Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers);
} }
applyMiddleware({ app, domain }) { applyMiddleware({ appType, domain }) {
this.executor.applyMiddleware({ app, domain }); const app = this.executor.applyMiddleware({ appType, domain });
const registry = this.executor.registry.bind(this.executor); const registry = this.executor.registry.bind(this.executor);
registry() && (this.registryInterval = setInterval(registry, 30000)); registry() && (this.registryInterval = setInterval(registry, 30000));
return app;
} }
async close() { async close() {
if (this.registryInterval) { if (this.registryInterval) {
......
...@@ -11,12 +11,15 @@ ...@@ -11,12 +11,15 @@
"author": "harvey", "author": "harvey",
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"axios": "^1.9.0" "axios": "^1.9.0",
"koa-bodyparser": "^4.4.1",
"koa-router": "^13.0.1"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^22.4.1", "@types/node": "^22.4.1",
"body-parser": "^2.2.0",
"express": "^5.1.0", "express": "^5.1.0",
"typescript": "^5.8.3", "koa": "^2.4.1",
"body-parser": "^2.2.0" "typescript": "^5.8.3"
} }
} }
...@@ -3,7 +3,7 @@ import { JobExecutor } from "./index"; ...@@ -3,7 +3,7 @@ import { JobExecutor } from "./index";
import * as path from "path"; import * as path from "path";
const config = { const config = {
app: express().use(require('body-parser').json()), appType:'express',
appDomain: "http://10.0.1.92:17001/", appDomain: "http://10.0.1.92:17001/",
port: 8088, port: 8088,
xxlJob: { xxlJob: {
......
...@@ -18,8 +18,8 @@ interface XxlJobConfig { ...@@ -18,8 +18,8 @@ interface XxlJobConfig {
* 执行器全局配置 * 执行器全局配置
*/ */
interface JobExecutorOptions { interface JobExecutorOptions {
/** Express/Koa 应用实例 */ /** Express/Koa 'express' | 'koa' */
app: any; // Express.Application | Koa.Application; appType: string;
/** 应用对外访问的域名(用于回调) */ /** 应用对外访问的域名(用于回调) */
appDomain: string; appDomain: string;
/** 应用监听端口 */ /** 应用监听端口 */
...@@ -46,17 +46,17 @@ export class JobExecutor { ...@@ -46,17 +46,17 @@ export class JobExecutor {
} }
// 连接 XXL-JOB 调度中心 // 连接 XXL-JOB 调度中心
new (require("./xxl-job/index").XxlJobExecutor)( const app = new (require("./xxl-job/index").XxlJobExecutor)(
opts.xxlJob, opts.xxlJob,
this.jobHandlers this.jobHandlers
).applyMiddleware({ ).applyMiddleware({
app: opts.app, appType: opts.appType,
domain: opts.appDomain domain: opts.appDomain
}); });
// 启动 HTTP 服务 // 启动 HTTP 服务
opts.app.listen(opts.port, () => { app.listen(opts.port, () => {
console.log(`[XXL-JOB] Executor "${opts.xxlJob.executorKey}" is running`); console.log(`[XXL-JOB] Executor ${opts.appType} "${opts.xxlJob.executorKey}" is running`);
console.log(`- Local: http://localhost:${opts.port}`); console.log(`- Local: http://localhost:${opts.port}`);
console.log(`- Public: ${opts.appDomain}`); console.log(`- Public: ${opts.appDomain}`);
}); });
......
import { JobExecutor } from "./index";
import * as path from "path";
const config = {
appType: "koa",
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq", // 注:开发环境建议加后缀区分
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml') //根目录下文件夹frp中的frpc.toml
};
var jobExecutor = new JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
})
//批量任务注册
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
}
]
]
jobExecutor.registerBatch(batchJob);
\ No newline at end of file \ No newline at end of file
...@@ -27,15 +27,29 @@ export class Executor { ...@@ -27,15 +27,29 @@ export class Executor {
/** /**
* 应用执行器中间件 * 应用执行器中间件
* @param {*} app * @param {*} app koa | express
* @param {string} domain * @param {string} domain
*/ */
applyMiddleware({ app, domain }) { applyMiddleware({ appType, domain }) {
this.executorUrl = domain this.executorUrl = domain
if (appType === 'express') {
return this.applyExpressMiddleware()
} else if (appType === 'koa') {
return this.applyKoaMiddleware()
} else {
throw new Error(`Unsupported appType: ${appType}`)
}
}
/**
* Apply middleware for Express
*/
private applyExpressMiddleware() {
const express = require('express') const express = require('express')
var router = new express.Router() const app = express().use(require('body-parser').json())
const router = express.Router()
//请求认证
router.use(async (req, res, next) => { router.use(async (req, res, next) => {
res.status(200) res.status(200)
...@@ -52,43 +66,105 @@ export class Executor { ...@@ -52,43 +66,105 @@ export class Executor {
} }
next() next()
}) })
this.addJobRoutes(router) this.addJobRoutes(router, 'express')
app.use(router) app.use(router)
return app
} }
/** /**
* 添加xxl-job相关的路由,供调度中心访问 * Apply middleware for Koa
* @param {express.Router} router * @param {*} app
* @param {string} baseUri
*/ */
addJobRoutes(router) { private applyKoaMiddleware() {
router.post(`/beat`, async (req, res, next) => { const Koa = require('koa')
res.send(this.beat()) const app = new Koa()
}) app.use(require('koa-bodyparser')())
router.post(`/idleBeat`, async (req, res, next) => { const Router = require('koa-router')
res.send(this.idleBeat(req.body.jobId || -1)) const router = new Router()
}) const bodyParser = require('koa-bodyparser')
router.post(`/run`, async (req, res, next) => { // Request authentication
res.send(this.run(req.body || {})) router.use(async (ctx, next) => {
}) ctx.status = 200
router.post(`/kill`, async (req, res, next) => { const token = ctx.headers && ctx.headers['xxl-job-access-token']
res.send(this.killJob(req.body.jobId || -1))
})
router.post(`/log`, async (req, res, next) => { if (!!this.accessToken && this.accessToken !== token) {
const { logDateTim, logId, fromLineNum } = req.body || {} ctx.body = { code: 500, msg: 'access token incorrect' }
const data = await this.readLog(logDateTim, logId, fromLineNum) return
res.send(data) }
if (!ctx.request.body) {
ctx.body = { code: 500, msg: 'body is null' }
return
}
await next()
}) })
this.addJobRoutes(router, 'koa')
app.use(bodyParser())
app.use(router.routes()).use(router.allowedMethods())
return app
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {express.Router} router
* @param {string} baseUri
*/
addJobRoutes(router, appType) {
// router.post(`/beat`, async (req, res, next) => {
// res.send(this.beat())
// })
// router.post(`/idleBeat`, async (req, res, next) => {
// res.send(this.idleBeat(req.body.jobId || -1))
// })
// router.post(`/run`, async (req, res, next) => {
// res.send(this.run(req.body || {}))
// })
// router.post(`/kill`, async (req, res, next) => {
// res.send(this.killJob(req.body.jobId || -1))
// })
// router.post(`/log`, async (req, res, next) => {
// const { logDateTim, logId, fromLineNum } = req.body || {}
// const data = await this.readLog(logDateTim, logId, fromLineNum)
// res.send(data)
// })
// 定义通用的路由处理方法
const handleRoute = (path: string, handler: (reqBody: any) => any) => {
if (appType === 'express') {
router.post(path, async (req: any, res: any) => {
const result = handler(req.body || {});
res.send(result);
});
} else if (appType === 'koa') {
router.post(path, async (ctx: any) => {
const result = handler(ctx.request.body || {});
ctx.body = result;
});
}
};
// 注册路由
handleRoute('/beat', () => this.beat());
handleRoute('/idleBeat', (body) => this.idleBeat(body.jobId || -1));
handleRoute('/run', (body) => this.run(body || {}));
handleRoute('/kill', (body) => this.killJob(body.jobId || -1));
handleRoute('/log', async (body) => {
const { logDateTim, logId, fromLineNum } = body || {};
return await this.readLog(logDateTim, logId, fromLineNum);
});
} }
/** /**
......
...@@ -26,13 +26,14 @@ export class XxlJobExecutor { ...@@ -26,13 +26,14 @@ export class XxlJobExecutor {
/** /**
* 应用执行器组件 * 应用执行器组件
* @param {Object} args * @param {Object} args
* @param {any} args.app 执行器server, express * @param {any} args.appType 执行器server, express
* @param {string} args.appDomain 执行器 server 地址 * @param {string} args.appDomain 执行器 server 地址
*/ */
public applyMiddleware({ app, domain }) { public applyMiddleware({ appType, domain }) {
this.executor.applyMiddleware({ app, domain }) const app = this.executor.applyMiddleware({ appType, domain })
const registry = this.executor.registry.bind(this.executor) const registry = this.executor.registry.bind(this.executor)
registry() && (this.registryInterval = setInterval(registry, 30000)) registry() && (this.registryInterval = setInterval(registry, 30000))
return app
} }
/** /**
......
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!