Commit 37bce485 Harvey

no message

1 个父辈 b9232309
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const express = require("express");
const config = require('../config/config.js');
// 实例化 XxlJobExecutor 组件
const XxlJobExecutor = require('../index');
const { jobHandlers } = require('./jobHandlers');
const context = { /* anything*/};
const xxlJobExecutor = new XxlJobExecutor(jobHandlers, context);
// 实例化 express app
const app = require('express')();
const index_1 = require("./job/index");
const index_2 = require("./xxl_job/index");
const app = express();
app.use(require('body-parser').json());
// 应用 XxlJobExecutor 组件
xxlJobExecutor.applyMiddleware({ app, appType: 'EXPRESS', appDomain: 'http://[2408:8352:602:1100:1c8a:80f5:da47:66ef]:8088', path: '' });
new index_2.XxlJobExecutor(config.xxl_job, index_1.job_handlers).applyMiddleware({ app, appDomain: config.app_domain, path: '' });
app.listen(config.port, () => {
console.log(`job-ydn-zq app listening on port ${config.port}`);
});
......
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.job_handlers = void 0;
const job_handlers = new Map();
exports.job_handlers = job_handlers;
//# sourceMappingURL=index.js.map
\ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.demoJobHandler = void 0;
const common_1 = require("../../libs/common");
/**
* demo任务
* @param {any} jobLogger 由xxl-job组件定义的任务logger,会将日志内容输出到文件,可在调度中心查看执行日志
* @param {{ jobParam1: any, jobParam2: any }} jobParams 任务参数
* @param {Object} context 任务上下文
* @return {Promise<void>} 函数必须返回一个 promise
*/
async function demoJobHandler(jobLogger, jobParams, context) {
jobLogger.debug('params: %o, context: %o', jobParams, context);
for (let i = 1; i < 10; i++) {
await (0, common_1.sleep)(1000);
jobLogger.debug(`${i}s passed`);
}
}
exports.demoJobHandler = demoJobHandler;
//# sourceMappingURL=index.js.map
\ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.sleep = void 0;
async function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
exports.sleep = sleep;
//# sourceMappingURL=common.js.map
\ No newline at end of file
const { last, path, pathOr, pick, omitNil, propOr, postTask, Task, tapTask } = require('./purefuncs');
const JobManager = require('./job-manager');
const logger = require('./logger');
const log = logger('xxl-job-executor');
const xxlPostTask = ({ url, data, config }) => postTask(url, data, config);
class Executor {
/**
* @param {string} executorKey
* @param {string} scheduleCenterUrl
* @param {string} accessToken
* @param {string} jobLogPath
* @param {Map} jobHandlers
* @param {*} context
*/
constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context) {
this.executorKey = executorKey;
this.scheduleCenterUrl = scheduleCenterUrl;
this.accessToken = accessToken;
this.jobHandlers = jobHandlers;
this.jobManager = new JobManager(jobLogPath, context);
}
/**
* 应用执行器中间件
* @param {*} app
* @param {string} appType
* @param {string} appDomain
* @param {string} uri
*/
applyMiddleware({ app, appType, appDomain, uri }) {
switch (appType) {
case 'EXPRESS': {
const Express = require('express');
const Router = Express.Router;
this.router = new Router();
this.initExpressRouter(uri);
app.use(this.router);
break;
}
default:
throw 'unsupported appType, just support express or koa';
}
this.appType = appType;
this.executorUrl = 'http://192.168.31.251:8088' + uri;
}
/**
* 初始化适用于express的router
* @param {string} uri
*/
initExpressRouter(uri) {
// authentication
this.router.use(uri, async (req, res, next) => {
res.status(200);
const { url, method, body } = req;
log.trace('%s %s %o', method, url, omitNil(pick(['jobId', 'executorHandler', 'executorParams', 'executorTimeout', 'logId', 'logDateTime'], body)));
const token = path(['headers', 'xxl-job-access-token'], req);
if (!!this.accessToken && this.accessToken !== token) {
res.send({ code: 500, msg: 'access token incorrect' });
return;
}
if (!propOr(false, 'body', req)) {
res.send({ code: 500, msg: 'need apply body-parser middleware first' });
return;
}
await next();
});
this.addRoutes(uri);
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {string} baseUri
*/
addRoutes(baseUri) {
// detect whether the executor is online
this.router.post(`${baseUri}/beat`, async (...contexts) => {
const { res } = this.wrappedHandler(contexts);
res.send(this.beat());
});
// check whether is already have the same job is running
this.router.post(`${baseUri}/idleBeat`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
const jobId = pathOr(-1, ['body', 'jobId'], req);
res.send(this.idleBeat(jobId));
});
// trigger job
this.router.post(`${baseUri}/run`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
res.send(this.run(propOr({}, 'body', req)));
});
// kill job
this.router.post(`${baseUri}/kill`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
res.send(this.killJob(pathOr(-1, ['body', 'jobId'], req)));
});
// view job's execution log
this.router.post(`${baseUri}/log`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
const { logDateTim: logDateTime, logId, fromLineNum } = propOr({}, 'body', req);
const data = await this.readLog(logDateTime, logId, fromLineNum);
res.send(data);
});
}
/**
* 将koa和express的request body处理成相同的结构,方便后边router处理
* @param {any} contexts
* @return {Object}
*/
wrappedHandler(contexts) {
switch (this.appType) {
case 'EXPRESS': {
const [req, res] = contexts;
return { req, res };
}
case 'KOA': {
const [ctx] = contexts;
return { req: propOr({}, 'request', ctx), res: { send: (body) => ctx.body = body } };
}
}
}
/**
* 心跳检测:调度中心检测执行器是否在线时使用
* @return {{code: number, msg: string}}
*/
beat() {
return { code: 200, msg: 'success' };
}
/**
* 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
* @param {string} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
idleBeat(jobId) {
return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' });
}
/**
* 触发任务执行
* @param {number} jobId - 任务ID
* @param {string} handlerName - 任务的handler名字
* @param {string} jobJsonParams - 任务参数
* @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
* @param {number} logId - 本次调度日志ID
* @param {number} - 本次调度日志时间
* @return {{code: number, msg: string}}
*/
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
// check executorHandler
const jobHandler = this.jobHandlers.get(handlerName);
if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` };
}
// execute job
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
return { code: 200, msg: 'success' };
}
/**
* 终止任务
* @param {number} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
killJob(jobId) {
return { code: 500, msg: `not yet support, jobId(${jobId})` };
}
/**
* 查看执行日志
* @param {number} logDateTime - 本次调度日志时间
* @param {number} logId - 本次调度日志ID
* @param {number} fromLineNum - 日志开始行号
* @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
*/
async readLog(logDateTime, logId, fromLineNum) {
let logContent;
let toLineNum;
try {
const lines = await this.jobManager.readJobLog(logDateTime, logId);
lines.splice(0, fromLineNum - 1);
if (last(lines) === '')
lines.pop();
toLineNum = fromLineNum + lines.length - 1;
lines.unshift('');
logContent = lines.join('\n');
}
catch (err) {
log.err('readLog error: %o', err.message || err);
toLineNum = fromLineNum;
logContent = err.toString();
}
return { code: 200, content: { fromLineNum, toLineNum, logContent } };
}
/**
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/
async registry() {
const url = `${this.scheduleCenterUrl}/api/registry`;
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl };
const headers = { 'xxl-job-access-token': this.accessToken };
await xxlPostTask({ url, data, config: { headers } })
.chain(tapTask((response) => log.trace('registry %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry error: %o', err.message || err);
return Task.of();
})
.run().promise();
}
/**
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/
async registryRemove() {
const url = `${this.scheduleCenterUrl}/api/registryRemove`;
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl };
const headers = { 'xxl-job-access-token': this.accessToken };
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('registry remove %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry remove error: %o', err.message || err);
return Task.of();
}).run().promise();
}
/**
* 任务回调:执行器执行完任务后,回调任务结果时使用
* @param {*} error
* @param {{logId: number, result: any}} jobResult
*/
async callback(error, { logId, result }) {
const url = `${this.scheduleCenterUrl}/api/callback`;
const headers = { 'xxl-job-access-token': this.accessToken };
const handleCode = error ? 500 : 200;
const handleMsg = error ? error.message || error.toString() : (result ? JSON.stringify(result) : 'success');
const data = [{ logId, logDateTim: Date.now(), handleCode, handleMsg }];
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('callback %o ==> %o', data[0], omitNil(propOr({}, 'data', response)))))
.orElse(tapTask((err) => log.err('callback error: %o', err.message || err)))
.run().promise();
}
}
module.exports = Executor;
//# sourceMappingURL=executor.js.map
\ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Executor = void 0;
const JobManager = require('./job-manager');
const logger = require('./logger');
const log = logger('xxl-job-executor');
const axios_1 = require("axios");
class Executor {
executorKey;
scheduleCenterUrl;
accessToken;
executorUrl;
jobHandlers;
jobManager;
/**
* @param {string} executorKey
* @param {string} scheduleCenterUrl
* @param {string} accessToken
* @param {string} jobLogPath
* @param {Map} jobHandlers
* @param {*} context
*/
constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context) {
this.executorKey = executorKey;
this.scheduleCenterUrl = scheduleCenterUrl;
this.accessToken = accessToken;
this.jobHandlers = jobHandlers;
this.jobManager = new JobManager(jobLogPath, context);
}
/**
* 应用执行器中间件
* @param {*} app
* @param {string} app_domain
* @param {string} uri
*/
applyMiddleware({ app, app_domain, uri }) {
const express = require('express');
var router = new express.Router();
this.initRouter(router, uri);
this.addJobRoutes(router, uri);
app.use(router);
this.executorUrl = (app_domain || 'http://192.168.31.251:8088') + uri;
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {express.Router} router
* @param {string} baseUri
*/
initRouter(router, uri) {
router.use(uri, async (req, res, next) => {
res.status(200);
const token = req.headers && req.headers['xxl-job-access-token'];
if (!!this.accessToken && this.accessToken !== token) {
res.send({ code: 500, msg: 'access token incorrect' });
return;
}
if (!req.body) {
res.send({ code: 500, msg: 'body is null' });
return;
}
next();
});
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {express.Router} router
* @param {string} baseUri
*/
addJobRoutes(router, baseUri) {
router.post(`${baseUri}/beat`, async (req, res, next) => {
res.send(this.beat());
});
router.post(`${baseUri}/idleBeat`, async (req, res, next) => {
res.send(this.idleBeat(req.body.jobId || -1));
});
router.post(`${baseUri}/run`, async (req, res, next) => {
res.send(this.run(req.body || {}));
});
router.post(`${baseUri}/kill`, async (req, res, next) => {
res.send(this.killJob(req.body.jobId || -1));
});
router.post(`${baseUri}/log`, async (req, res, next) => {
const { logDateTim: logDateTime, logId, fromLineNum } = req.body || {};
const data = await this.readLog(logDateTime, logId, fromLineNum);
res.send(data);
});
}
/**
* 心跳检测:调度中心检测执行器是否在线时使用
* @return {{code: number, msg: string}}
*/
beat() {
return { code: 200, msg: 'success' };
}
/**
* 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
* @param {string} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
idleBeat(jobId) {
return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' });
}
/**
* 触发任务执行
* @param {number} jobId - 任务ID
* @param {string} handlerName - 任务的handler名字
* @param {string} jobJsonParams - 任务参数
* @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
* @param {number} logId - 本次调度日志ID
* @param {number} - 本次调度日志时间
* @return {{code: number, msg: string}}
*/
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
// check executorHandler
const jobHandler = this.jobHandlers.get(handlerName);
if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` };
}
// execute job
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
return { code: 200, msg: 'success' };
}
/**
* 终止任务
* @param {number} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
killJob(jobId) {
return { code: 500, msg: `not yet support, jobId(${jobId})` };
}
/**
* 查看执行日志
* @param {number} logDateTime - 本次调度日志时间
* @param {number} logId - 本次调度日志ID
* @param {number} fromLineNum - 日志开始行号
* @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
*/
async readLog(logDateTime, logId, fromLineNum) {
let logContent;
let toLineNum;
try {
const lines = await this.jobManager.readJobLog(logDateTime, logId);
lines.splice(0, fromLineNum - 1);
if (last(lines) === '')
lines.pop();
toLineNum = fromLineNum + lines.length - 1;
lines.unshift('');
logContent = lines.join('\n');
}
catch (err) {
log.err('readLog error: %o', err.message);
toLineNum = fromLineNum;
logContent = err.toString();
}
return { code: 200, content: { fromLineNum, toLineNum, logContent } };
}
/**
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/
async registry() {
await axios_1.default.post(`${this.scheduleCenterUrl}/api/registry`, {
registryGroup: 'EXECUTOR',
registryKey: this.executorKey,
registryValue: this.executorUrl
}, {
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => console.log('执行器注册成功:', response.data)).catch(error => console.error('执行器注册失败:', error.message));
}
/**
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/
async registryRemove() {
await axios_1.default.post(`${this.scheduleCenterUrl}/api/registryRemove`, {
'registryGroup': 'EXECUTOR',
'registryKey': this.executorKey,
'registryValue': this.executorUrl
}, {
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => console.log('执行器移除成功:', response.data)).catch(error => console.error('执行器移除失败:', error.message));
}
/**
* 任务回调:执行器执行完任务后,回调任务结果时使用
* @param {Error|null} error 错误对象
* @param {{logId: number, result: any}} jobResult 任务结果
*/
async callback(error, { logId, result }) {
return axios_1.default.post(`${this.scheduleCenterUrl}/api/callback`, [{
logId,
logDateTim: Date.now(),
handleCode: error ? 500 : 200,
handleMsg: error ? error.message : (result ? JSON.stringify(result) : 'success')
}], {
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => response.data).catch((err) => {
log.error(`callback error:${JSON.stringify(result)} ${err.message}`);
});
}
}
exports.Executor = Executor;
//# sourceMappingURL=executor.js.map
\ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.XxlJobExecutor = void 0;
const Executor = require("./executor");
const { isNilOrEmpty, notEmpty } = require('./src/purefuncs');
/**
......@@ -26,8 +27,8 @@ class XxlJobExecutor {
* @param {string} args.appDomain 执行器 server 地址,eg: http://server-api.com
* @param {string} args.path 执行器挂载的 uri 路径,eg: /job
*/
applyMiddleware({ app, appDomain, path: uri }) {
this.executor.applyMiddleware({ app, appDomain, uri });
applyMiddleware({ app, domain, path: uri }) {
this.executor.applyMiddleware({ app, domain, uri });
const registry = this.executor.registry.bind(this.executor);
registry() && setInterval(registry, 30000);
}
......@@ -38,4 +39,5 @@ class XxlJobExecutor {
await this.executor.registryRemove();
}
}
exports.XxlJobExecutor = XxlJobExecutor;
//# sourceMappingURL=index.js.map
\ No newline at end of file
......@@ -20,6 +20,8 @@ try {
port: 8088,
// 前端地址
app_domain: 'http://[2408:8352:602:1100:1c8a:80f5:da47:66ef]:8088',
};
}
......
......@@ -11,6 +11,8 @@
"author": "",
"license": "ISC",
"dependencies": {
"axios": "^1.9.0",
"body-parser": "^2.2.0",
"express": "^5.1.0"
},
"devDependencies": {
......
import * as express from 'express'
const config = require('../config/config.js')
import * as express from "express";
import { job_handlers } from './job/index'
import { XxlJobExecutor } from './xxl_job/index'
// 实例化 XxlJobExecutor 组件
const XxlJobExecutor = require('../index')
const { jobHandlers } = require('./jobHandlers')
const context = { /* anything*/ }
const xxlJobExecutor = new XxlJobExecutor(jobHandlers, context)
// 实例化 express app
const app = require('express')()
const app = express()
app.use(require('body-parser').json())
// 应用 XxlJobExecutor 组件
xxlJobExecutor.applyMiddleware({ app, appType: 'EXPRESS', appDomain: 'http://[2408:8352:602:1100:1c8a:80f5:da47:66ef]:8088', path: '' })
new XxlJobExecutor(config.xxl_job, job_handlers).applyMiddleware({ app, appDomain: config.app_domain, path: '' })
app.listen(config.port, () => {
......
const job_handlers = new Map();
export { job_handlers }
\ No newline at end of file
import { sleep } from '../../libs/common'
/**
* demo任务
* @param {any} jobLogger 由xxl-job组件定义的任务logger,会将日志内容输出到文件,可在调度中心查看执行日志
* @param {{ jobParam1: any, jobParam2: any }} jobParams 任务参数
* @param {Object} context 任务上下文
* @return {Promise<void>} 函数必须返回一个 promise
*/
export async function demoJobHandler(jobLogger, jobParams, context) {
jobLogger.debug('params: %o, context: %o', jobParams, context)
for (let i = 1; i < 10; i++) {
await sleep(1000)
jobLogger.debug(`${i}s passed`)
}
}
export async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
\ No newline at end of file
const { last, path, pathOr, pick, omitNil, propOr, postTask, Task, tapTask } = require('./purefuncs')
const JobManager = require('./job-manager')
const logger = require('./logger')
const log = logger('xxl-job-executor')
const xxlPostTask = ({ url, data, config }) => postTask(url, data, config)
import axios from 'axios';
export class Executor {
executorKey
scheduleCenterUrl
accessToken
executorUrl
jobHandlers
jobManager
class Executor {
/**
* @param {string} executorKey
* @param {string} scheduleCenterUrl
......@@ -25,105 +32,78 @@ class Executor {
/**
* 应用执行器中间件
* @param {*} app
* @param {string} appType
* @param {string} appDomain
* @param {string} app_domain
* @param {string} uri
*/
applyMiddleware({ app, appType, appDomain, uri }) {
switch (appType) {
case 'EXPRESS': {
const Express = require('express')
const Router = Express.Router
this.router = new Router()
this.initExpressRouter(uri)
app.use(this.router)
break
}
default:
throw 'unsupported appType, just support express or koa'
}
this.appType = appType
this.executorUrl = 'http://192.168.31.251:8088' + uri
applyMiddleware({ app, app_domain, uri }) {
const express = require('express')
var router = new express.Router()
this.initRouter(router, uri)
this.addJobRoutes(router, uri)
app.use(router)
this.executorUrl = (app_domain || 'http://192.168.31.251:8088') + uri
}
/**
* 初始化适用于express的router
* @param {string} uri
* 添加xxl-job相关的路由,供调度中心访问
* @param {express.Router} router
* @param {string} baseUri
*/
initExpressRouter(uri) {
// authentication
this.router.use(uri, async (req, res, next) => {
initRouter(router: any, uri: string) {
router.use(uri, async (req, res, next) => {
res.status(200)
const { url, method, body } = req
log.trace('%s %s %o', method, url, omitNil(pick(['jobId', 'executorHandler', 'executorParams', 'executorTimeout', 'logId', 'logDateTime'], body)))
const token = path(['headers', 'xxl-job-access-token'], req)
const token = req.headers && req.headers['xxl-job-access-token']
if (!!this.accessToken && this.accessToken !== token) {
res.send({ code: 500, msg: 'access token incorrect' })
return
}
if (!propOr(false, 'body', req)) {
res.send({ code: 500, msg: 'need apply body-parser middleware first' })
if (!req.body) {
res.send({ code: 500, msg: 'body is null' })
return
}
await next()
next()
})
this.addRoutes(uri)
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {express.Router} router
* @param {string} baseUri
*/
addRoutes(baseUri) {
// detect whether the executor is online
this.router.post(`${baseUri}/beat`, async (...contexts) => {
const { res } = this.wrappedHandler(contexts)
addJobRoutes(router, baseUri) {
router.post(`${baseUri}/beat`, async (req, res, next) => {
res.send(this.beat())
})
// check whether is already have the same job is running
this.router.post(`${baseUri}/idleBeat`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
const jobId = pathOr(-1, ['body', 'jobId'], req)
res.send(this.idleBeat(jobId))
router.post(`${baseUri}/idleBeat`, async (req, res, next) => {
res.send(this.idleBeat(req.body.jobId || -1))
})
// trigger job
this.router.post(`${baseUri}/run`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
res.send(this.run(propOr({}, 'body', req)))
router.post(`${baseUri}/run`, async (req, res, next) => {
res.send(this.run(req.body || {}))
})
// kill job
this.router.post(`${baseUri}/kill`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
res.send(this.killJob(pathOr(-1, ['body', 'jobId'], req)))
router.post(`${baseUri}/kill`, async (req, res, next) => {
res.send(this.killJob(req.body.jobId || -1))
})
// view job's execution log
this.router.post(`${baseUri}/log`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
const { logDateTim: logDateTime, logId, fromLineNum } = propOr({}, 'body', req)
router.post(`${baseUri}/log`, async (req, res, next) => {
const { logDateTim: logDateTime, logId, fromLineNum } = req.body || {}
const data = await this.readLog(logDateTime, logId, fromLineNum)
res.send(data)
})
}
/**
* 将koa和express的request body处理成相同的结构,方便后边router处理
* @param {any} contexts
* @return {Object}
*/
wrappedHandler(contexts) {
switch (this.appType) {
case 'EXPRESS': {
const [req, res] = contexts
return { req, res }
}
case 'KOA': {
const [ctx] = contexts
return { req: propOr({}, 'request', ctx), res: { send: (body) => ctx.body = body } }
}
}
}
/**
* 心跳检测:调度中心检测执行器是否在线时使用
* @return {{code: number, msg: string}}
*/
......@@ -189,7 +169,7 @@ class Executor {
lines.unshift('')
logContent = lines.join('\n')
} catch (err) {
log.err('readLog error: %o', err.message || err)
log.err('readLog error: %o', err.message)
toLineNum = fromLineNum
logContent = err.toString()
}
......@@ -200,53 +180,59 @@ class Executor {
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/
async registry() {
const url = `${this.scheduleCenterUrl}/api/registry`
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl }
const headers = { 'xxl-job-access-token': this.accessToken }
await xxlPostTask({ url, data, config: { headers } })
.chain(tapTask((response) => log.trace('registry %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry error: %o', err.message || err)
return Task.of()
})
.run().promise()
await axios.post(`${this.scheduleCenterUrl}/api/registry`,
{
registryGroup: 'EXECUTOR',
registryKey: this.executorKey,
registryValue: this.executorUrl
},
{
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => console.log('执行器注册成功:', response.data)).catch(error => console.error('执行器注册失败:', error.message))
}
/**
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/
async registryRemove() {
const url = `${this.scheduleCenterUrl}/api/registryRemove`
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl }
const headers = { 'xxl-job-access-token': this.accessToken }
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('registry remove %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry remove error: %o', err.message || err)
return Task.of()
}).run().promise()
await axios.post(`${this.scheduleCenterUrl}/api/registryRemove`,
{
'registryGroup': 'EXECUTOR',
'registryKey': this.executorKey,
'registryValue': this.executorUrl
},
{
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => console.log('执行器移除成功:', response.data)).catch(error => console.error('执行器移除失败:', error.message))
}
/**
* 任务回调:执行器执行完任务后,回调任务结果时使用
* @param {*} error
* @param {{logId: number, result: any}} jobResult
* @param {Error|null} error 错误对象
* @param {{logId: number, result: any}} jobResult 任务结果
*/
async callback(error, { logId, result }) {
const url = `${this.scheduleCenterUrl}/api/callback`
const headers = { 'xxl-job-access-token': this.accessToken }
const handleCode = error ? 500 : 200
const handleMsg = error ? error.message || error.toString() : (result ? JSON.stringify(result) : 'success')
const data = [{ logId, logDateTim: Date.now(), handleCode, handleMsg }]
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('callback %o ==> %o', data[0], omitNil(propOr({}, 'data', response)))))
.orElse(tapTask((err) => log.err('callback error: %o', err.message || err)))
.run().promise()
return axios.post(
`${this.scheduleCenterUrl}/api/callback`,
[{
logId,
logDateTim: Date.now(),
handleCode: error ? 500 : 200,
handleMsg: error ? error.message : (result ? JSON.stringify(result) : 'success')
}],
{
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => response.data).catch((err) => {
log.error(`callback error:${JSON.stringify(result)} ${err.message}`);
})
}
}
module.exports = Executor
......@@ -4,7 +4,7 @@ const { isNilOrEmpty, notEmpty } = require('./src/purefuncs')
/**
* XxlJobExecutor
*/
class XxlJobExecutor {
export class XxlJobExecutor {
executor;
......@@ -35,8 +35,8 @@ class XxlJobExecutor {
* @param {string} args.appDomain 执行器 server 地址,eg: http://server-api.com
* @param {string} args.path 执行器挂载的 uri 路径,eg: /job
*/
applyMiddleware({ app, appDomain, path: uri }) {
this.executor.applyMiddleware({ app, appDomain, uri })
applyMiddleware({ app, domain, path: uri }) {
this.executor.applyMiddleware({ app, domain, uri })
const registry = this.executor.registry.bind(this.executor)
registry() && setInterval(registry, 30000)
}
......
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!