Commit b9232309 Harvey

no message

1 个父辈 0192d7f1
var config;
try {
try {
config = JSON.parse(process.env.CONFIG);
}
catch (e) {
config = require('../config/config.js');
}
}
catch (e) {
console.log(e);
}
if (!config) {
console.log('配置获取失败');
process.exit();
}
console.log(config);
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const config = require('../config/config.js');
// 实例化 XxlJobExecutor 组件
const XxlJobExecutor = require('../index');
const { jobHandlers } = require('./jobHandlers');
const context = { /* anything*/};
const xxlJobExecutor = new XxlJobExecutor(jobHandlers, context);
// 实例化 express app
const app = require('express')();
app.use(require('body-parser').json());
// 应用 XxlJobExecutor 组件
xxlJobExecutor.applyMiddleware({ app, appType: 'EXPRESS', appDomain: 'http://[2408:8352:602:1100:1c8a:80f5:da47:66ef]:8088', path: '' });
app.listen(config.port, () => {
console.log(`job-ydn-zq app listening on port ${config.port}`);
});
//# sourceMappingURL=index.js.map
\ No newline at end of file
const { last, path, pathOr, pick, omitNil, propOr, postTask, Task, tapTask } = require('./purefuncs');
const JobManager = require('./job-manager');
const logger = require('./logger');
const log = logger('xxl-job-executor');
const xxlPostTask = ({ url, data, config }) => postTask(url, data, config);
class Executor {
/**
* @param {string} executorKey
* @param {string} scheduleCenterUrl
* @param {string} accessToken
* @param {string} jobLogPath
* @param {Map} jobHandlers
* @param {*} context
*/
constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context) {
this.executorKey = executorKey;
this.scheduleCenterUrl = scheduleCenterUrl;
this.accessToken = accessToken;
this.jobHandlers = jobHandlers;
this.jobManager = new JobManager(jobLogPath, context);
}
/**
* 应用执行器中间件
* @param {*} app
* @param {string} appType
* @param {string} appDomain
* @param {string} uri
*/
applyMiddleware({ app, appType, appDomain, uri }) {
switch (appType) {
case 'EXPRESS': {
const Express = require('express');
const Router = Express.Router;
this.router = new Router();
this.initExpressRouter(uri);
app.use(this.router);
break;
}
default:
throw 'unsupported appType, just support express or koa';
}
this.appType = appType;
this.executorUrl = 'http://192.168.31.251:8088' + uri;
}
/**
* 初始化适用于express的router
* @param {string} uri
*/
initExpressRouter(uri) {
// authentication
this.router.use(uri, async (req, res, next) => {
res.status(200);
const { url, method, body } = req;
log.trace('%s %s %o', method, url, omitNil(pick(['jobId', 'executorHandler', 'executorParams', 'executorTimeout', 'logId', 'logDateTime'], body)));
const token = path(['headers', 'xxl-job-access-token'], req);
if (!!this.accessToken && this.accessToken !== token) {
res.send({ code: 500, msg: 'access token incorrect' });
return;
}
if (!propOr(false, 'body', req)) {
res.send({ code: 500, msg: 'need apply body-parser middleware first' });
return;
}
await next();
});
this.addRoutes(uri);
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {string} baseUri
*/
addRoutes(baseUri) {
// detect whether the executor is online
this.router.post(`${baseUri}/beat`, async (...contexts) => {
const { res } = this.wrappedHandler(contexts);
res.send(this.beat());
});
// check whether is already have the same job is running
this.router.post(`${baseUri}/idleBeat`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
const jobId = pathOr(-1, ['body', 'jobId'], req);
res.send(this.idleBeat(jobId));
});
// trigger job
this.router.post(`${baseUri}/run`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
res.send(this.run(propOr({}, 'body', req)));
});
// kill job
this.router.post(`${baseUri}/kill`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
res.send(this.killJob(pathOr(-1, ['body', 'jobId'], req)));
});
// view job's execution log
this.router.post(`${baseUri}/log`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts);
const { logDateTim: logDateTime, logId, fromLineNum } = propOr({}, 'body', req);
const data = await this.readLog(logDateTime, logId, fromLineNum);
res.send(data);
});
}
/**
* 将koa和express的request body处理成相同的结构,方便后边router处理
* @param {any} contexts
* @return {Object}
*/
wrappedHandler(contexts) {
switch (this.appType) {
case 'EXPRESS': {
const [req, res] = contexts;
return { req, res };
}
case 'KOA': {
const [ctx] = contexts;
return { req: propOr({}, 'request', ctx), res: { send: (body) => ctx.body = body } };
}
}
}
/**
* 心跳检测:调度中心检测执行器是否在线时使用
* @return {{code: number, msg: string}}
*/
beat() {
return { code: 200, msg: 'success' };
}
/**
* 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
* @param {string} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
idleBeat(jobId) {
return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' });
}
/**
* 触发任务执行
* @param {number} jobId - 任务ID
* @param {string} handlerName - 任务的handler名字
* @param {string} jobJsonParams - 任务参数
* @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
* @param {number} logId - 本次调度日志ID
* @param {number} - 本次调度日志时间
* @return {{code: number, msg: string}}
*/
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
// check executorHandler
const jobHandler = this.jobHandlers.get(handlerName);
if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` };
}
// execute job
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
return { code: 200, msg: 'success' };
}
/**
* 终止任务
* @param {number} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
killJob(jobId) {
return { code: 500, msg: `not yet support, jobId(${jobId})` };
}
/**
* 查看执行日志
* @param {number} logDateTime - 本次调度日志时间
* @param {number} logId - 本次调度日志ID
* @param {number} fromLineNum - 日志开始行号
* @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
*/
async readLog(logDateTime, logId, fromLineNum) {
let logContent;
let toLineNum;
try {
const lines = await this.jobManager.readJobLog(logDateTime, logId);
lines.splice(0, fromLineNum - 1);
if (last(lines) === '')
lines.pop();
toLineNum = fromLineNum + lines.length - 1;
lines.unshift('');
logContent = lines.join('\n');
}
catch (err) {
log.err('readLog error: %o', err.message || err);
toLineNum = fromLineNum;
logContent = err.toString();
}
return { code: 200, content: { fromLineNum, toLineNum, logContent } };
}
/**
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/
async registry() {
const url = `${this.scheduleCenterUrl}/api/registry`;
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl };
const headers = { 'xxl-job-access-token': this.accessToken };
await xxlPostTask({ url, data, config: { headers } })
.chain(tapTask((response) => log.trace('registry %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry error: %o', err.message || err);
return Task.of();
})
.run().promise();
}
/**
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/
async registryRemove() {
const url = `${this.scheduleCenterUrl}/api/registryRemove`;
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl };
const headers = { 'xxl-job-access-token': this.accessToken };
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('registry remove %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry remove error: %o', err.message || err);
return Task.of();
}).run().promise();
}
/**
* 任务回调:执行器执行完任务后,回调任务结果时使用
* @param {*} error
* @param {{logId: number, result: any}} jobResult
*/
async callback(error, { logId, result }) {
const url = `${this.scheduleCenterUrl}/api/callback`;
const headers = { 'xxl-job-access-token': this.accessToken };
const handleCode = error ? 500 : 200;
const handleMsg = error ? error.message || error.toString() : (result ? JSON.stringify(result) : 'success');
const data = [{ logId, logDateTim: Date.now(), handleCode, handleMsg }];
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('callback %o ==> %o', data[0], omitNil(propOr({}, 'data', response)))))
.orElse(tapTask((err) => log.err('callback error: %o', err.message || err)))
.run().promise();
}
}
module.exports = Executor;
//# sourceMappingURL=executor.js.map
\ No newline at end of file
const { once } = require('events');
const fs = require('fs');
const readline = require('readline');
const mkdir = (path) => !fs.existsSync(path) && fs.mkdirSync(path, { recursive: true });
const searchInFile = async (filePath, content, endContent) => {
const rl = readline.createInterface({ input: fs.createReadStream(filePath), crlfDelay: Infinity });
const reg = content ? new RegExp(content, 'i') : undefined;
const endReg = endContent ? new RegExp(endContent, 'i') : undefined;
const lines = [];
let stop = false;
rl.on('line', (line) => {
if (stop) {
rl.close();
return;
}
reg && reg.test(line) && lines.push(line);
endReg && endReg.test(line) && (stop = true) && rl.close();
});
await once(rl, 'close');
return lines;
};
module.exports = {
mkdir,
searchInFile,
};
//# sourceMappingURL=file.js.map
\ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const Executor = require("./executor");
const { isNilOrEmpty, notEmpty } = require('./src/purefuncs');
/**
* XxlJobExecutor
*/
class XxlJobExecutor {
executor;
/**
* 创建 XxlJobExecutor 实例
* @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数
* @param {Object} context 为所有任务执行函数指定公共的上下文对象,常见比如数据库实例 { database, redis }
*/
constructor(opts, jobHandlers) {
const { XXL_JOB_EXECUTOR_KEY: executorKey, XXL_JOB_SCHEDULE_CENTER_URL: scheduleCenterUrl, XXL_JOB_ACCESS_TOKEN: accessToken, XXL_JOB_JOB_LOG_PATH: jobLogPath, } = opts;
const parameters = { executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers };
// const invalidParameters = Object.entries(parameters).filter(([, value]) => isNilOrEmpty(value))
// if (notEmpty(invalidParameters)) throw `invalid parameter: ${invalidParameters.map(([key]) => key).join(',')}`
this.executor = new Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers);
}
/**
* 应用执行器组件
* @param {Object} args
* @param {any} args.app 执行器server, express
* @param {string} args.appDomain 执行器 server 地址,eg: http://server-api.com
* @param {string} args.path 执行器挂载的 uri 路径,eg: /job
*/
applyMiddleware({ app, appDomain, path: uri }) {
this.executor.applyMiddleware({ app, appDomain, uri });
const registry = this.executor.registry.bind(this.executor);
registry() && setInterval(registry, 30000);
}
/**
* 关闭服务前应调用该方法,将执行器从调度中心摘除
*/
async close() {
await this.executor.registryRemove();
}
}
//# sourceMappingURL=index.js.map
\ No newline at end of file
const fs = require('fs');
const moment = require('moment');
const Path = require('path');
const logger = require('./logger');
const { Task, tapTask } = require('./purefuncs');
const { mkdir, searchInFile } = require('./file');
const log = logger('xxl-job-manager');
/**
* 任务管理
*/
class JobManager {
/**
* @param {string} jobLogPath
* @param {*} context
*/
constructor(jobLogPath, context) {
mkdir(jobLogPath);
this.jobLogPath = jobLogPath;
this.context = context;
this.runningJobs = new Set();
}
/**
* 根据调度时间获取日志文件路径
* @param {number} dateTime
* @return {string}
*/
getLogFilePath(dateTime) {
return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`);
}
/**
* 构造任务logger的namespace
* @param {string} handlerName
* @param {number} dateTime
* @param {number} logId
* @return {string}
*/
getJobLoggerNamespace(handlerName, dateTime, logId) {
return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`;
}
/**
* @param {{number}} jobId
* @return {boolean}
*/
hasJob(jobId) {
return this.runningJobs.has(jobId);
}
/**
* @param {number} jobId
* @param {string} jobJsonParams
* @param {number} logId
* @param {number} logDateTime
* @param {number} executorTimeout
* @param {string} handlerName
* @param {function} jobHandler
* @param {function} callback
*/
runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, callback) {
let timeout = undefined;
const logNameSpace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
const logFilePath = this.getLogFilePath(logDateTime);
const jobLogger = logger(logNameSpace, logFilePath);
Task.of(jobJsonParams)
.chain((jobJsonParams) => Task.of(jobJsonParams ? JSON.parse(jobJsonParams) : {}))
.chain((jobParams) => {
jobLogger.trace('start');
// check duplicate job
if (this.hasJob(jobId))
return Task.rejected('There is already have a same job is running');
this.runningJobs.add(jobId);
// setup timeout
if (executorTimeout) {
timeout = setTimeout(async () => await this.finishJob({ jobId, logId, jobLogger, callback, timeout, error: new Error('timeout') }), executorTimeout * 1000);
}
return Task.fromPromised(jobHandler)(jobLogger, jobParams, this.context);
})
.chain((result) => Task.of({ result }))
.orElse((error) => Task.of({ error }))
.chain(tapTask(async ({ result, error }) => await this.finishJob({ jobId, logId, jobLogger, callback, timeout, result, error })))
.run().promise();
}
/**
* @param {number} logDateTime
* @param {number} logId
* @return {Promise<Array>}
*/
async readJobLog(logDateTime, logId) {
const logFilePath = this.getLogFilePath(logDateTime);
const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' ';
return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : [];
}
/**
* @param {number} jobId
* @param {number} logId
* @param {*} jobLogger
* @param {function} callback
* @param {number} timeout
* @param {*} result
* @param {*} error
* @return {Promise<void>}
*/
async finishJob({ jobId, logId, jobLogger, callback, timeout, result, error }) {
try {
timeout && clearTimeout(timeout);
result && jobLogger.trace('result: %o', result);
error && jobLogger.err('error: %o', error.message || error);
jobLogger.trace('end');
jobLogger.close();
await callback(error, { logId, result });
}
catch (err) {
log.err('finishJob error: %o', err.message || err);
}
this.runningJobs.delete(jobId);
}
}
module.exports = JobManager;
//# sourceMappingURL=job-manager.js.map
\ No newline at end of file
const debug = require('debug');
const fs = require('fs');
const os = require('os');
const util = require('util');
const { always, propOr } = require('./purefuncs');
const enableExecutorDebugLog = /^(yes|on|true|enable|enabled|1)$/i.test(`${propOr(false, 'XXL_JOB_DEBUG_LOG', process.env)}`);
const enableLogLevels = propOr('info:*,warn:*,error:*,debug:*,trace:*', 'DEBUG', process.env);
const writeStreamOptions = { flags: 'a', encoding: 'utf8', autoClose: true, emitClose: true };
const noop = always(undefined);
const noopLogger = { info: noop, err: noop, debug: noop, warn: noop, trace: noop };
const dErr = debug('error');
const dInfo = debug('info');
const dWarn = debug('warn');
const dDebug = debug('debug');
const dTrace = debug('trace');
// 自定义对象,包装 debug 模拟日志级别
const createLogger = (ns) => {
const logger = {
info: dInfo.extend(ns),
err: dErr.extend(ns),
debug: dDebug.extend(ns),
warn: dWarn.extend(ns),
trace: dTrace.extend(ns),
};
Object.values(logger).forEach((levelLogger) => Object.assign(levelLogger, { enabled: true, useColors: false }));
return logger;
};
module.exports = (ns, logFilePath) => {
// 1. 执行器运行日志,输出到 stderr,限制日志级别
if (!logFilePath) {
if (!enableExecutorDebugLog)
return noopLogger;
const logger = createLogger(ns);
Object.entries(logger).forEach(([level, levelLogger]) => levelLogger.enabled = enableLogLevels.includes(level));
return logger;
}
// 2. 任务执行日志,同时输出到 stderr 和 文件,stderr 限制日志级别,输出到文件不限制级别以供调度中心全量查看
const writeStream = fs.createWriteStream(logFilePath, writeStreamOptions);
const log2File = (...args) => writeStream.write(`${util.format(...args)}${os.EOL}`);
const log2Stderr = (...args) => console.error(util.format(...args));
const log2FileAndStderr = (...args) => {
const content = util.format(...args);
writeStream.write(`${content}${os.EOL}`);
console.error(content);
};
const logger = createLogger(ns);
// 设置输出
Object.entries(logger).forEach(([level, levelLogger]) => {
levelLogger.log = enableLogLevels.includes(level) ? log2FileAndStderr : log2File;
});
// 任务执行完成,关闭文件输出流,后续日志只输出到 stderr
logger.close = () => {
Object.entries(logger).forEach(([level, levelLogger]) => {
Object.assign(levelLogger, { enabled: enableLogLevels.includes(level), log: log2Stderr });
});
writeStream.end();
};
return logger;
};
//# sourceMappingURL=logger.js.map
\ No newline at end of file
const Axios = require('axios');
const FC = require('folktale/concurrency');
const R = require('ramda');
const always = R.always;
const anyPass = R.anyPass;
const last = R.last;
const compose = R.compose;
const pick = R.pick;
const propOr = R.propOr;
const path = R.path;
const pathOr = R.pathOr;
const reject = R.reject;
const tap = R.tap;
const not = R.not;
const isNil = R.isNil;
const isEmpty = R.isEmpty;
const Task = FC.task;
const notEmpty = compose(not, isEmpty);
const omitNil = reject(isNil);
const isNilOrEmpty = anyPass([isNil, isEmpty]);
const tapTask = (f) => compose(Task.of, tap(f));
const postTask = Task.fromPromised(Axios.post);
module.exports = {
always, last, compose, pick, propOr, path, pathOr, tap, not, isNil, isEmpty, Task, notEmpty, omitNil,
isNilOrEmpty, tapTask, postTask,
};
//# sourceMappingURL=purefuncs.js.map
\ No newline at end of file
var config = {
// 执行器AppName,在调度中心配置执行器时使用
XXL_JOB_EXECUTOR_KEY: "executor-job-ydn-zq",
// 调度中心地址
XXL_JOB_SCHEDULE_CENTER_URL: "http://xxljob.ydniu.com/xxl-job-admin",
// 调度中心设置的请求令牌,调度中心和执行器都会进行校验,双方AccessToken匹配才允许通讯
XXL_JOB_ACCESS_TOKEN: "default_token",
// 任务执行日志的存储路径
XXL_JOB_JOB_LOG_PATH: "logs/job",
// 执行器运行日志开关(非任务执行日志),默认关闭
XXL_JOB_DEBUG_LOG: true,
};
let config;
try {
config = JSON.parse(process.env.CONFIG);
} catch (e) {
config = {
xxl_job: {
// 执行器AppName,在调度中心配置执行器时使用
XXL_JOB_EXECUTOR_KEY: "executor-job-ydn-zq",
// 调度中心地址
XXL_JOB_SCHEDULE_CENTER_URL: "http://xxljob.ydniu.com/xxl-job-admin",
// 调度中心设置的请求令牌,调度中心和执行器都会进行校验,双方AccessToken匹配才允许通讯
XXL_JOB_ACCESS_TOKEN: "default_token",
// 任务执行日志的存储路径
XXL_JOB_JOB_LOG_PATH: "logs/job",
// 执行器运行日志开关(非任务执行日志),默认关闭
XXL_JOB_DEBUG_LOG: true,
},
port: 8088,
};
}
module.exports = config;
var config;
try {
try {
config = JSON.parse(process.env.CONFIG)
} catch (e) { config = require('../config/config.js') }
} catch (e) {
console.log(e)
}
if (!config) {
console.log('配置获取失败')
process.exit()
}
console.log(config)
\ No newline at end of file
const config = require('../config/config.js')
import * as express from "express";
// 实例化 XxlJobExecutor 组件
const XxlJobExecutor = require('../index')
const { jobHandlers } = require('./jobHandlers')
const context = { /* anything*/ }
const xxlJobExecutor = new XxlJobExecutor(jobHandlers, context)
// 实例化 express app
const app = require('express')()
app.use(require('body-parser').json())
// 应用 XxlJobExecutor 组件
xxlJobExecutor.applyMiddleware({ app, appType: 'EXPRESS', appDomain: 'http://[2408:8352:602:1100:1c8a:80f5:da47:66ef]:8088', path: '' })
app.listen(config.port, () => {
console.log(`job-ydn-zq app listening on port ${config.port}`)
})
const { last, path, pathOr, pick, omitNil, propOr, postTask, Task, tapTask } = require('./purefuncs')
const JobManager = require('./job-manager')
const logger = require('./logger')
const log = logger('xxl-job-executor')
const xxlPostTask = ({ url, data, config }) => postTask(url, data, config)
class Executor {
/**
* @param {string} executorKey
* @param {string} scheduleCenterUrl
* @param {string} accessToken
* @param {string} jobLogPath
* @param {Map} jobHandlers
* @param {*} context
*/
constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context) {
this.executorKey = executorKey
this.scheduleCenterUrl = scheduleCenterUrl
this.accessToken = accessToken
this.jobHandlers = jobHandlers
this.jobManager = new JobManager(jobLogPath, context)
}
/**
* 应用执行器中间件
* @param {*} app
* @param {string} appType
* @param {string} appDomain
* @param {string} uri
*/
applyMiddleware({ app, appType, appDomain, uri }) {
switch (appType) {
case 'EXPRESS': {
const Express = require('express')
const Router = Express.Router
this.router = new Router()
this.initExpressRouter(uri)
app.use(this.router)
break
}
default:
throw 'unsupported appType, just support express or koa'
}
this.appType = appType
this.executorUrl = 'http://192.168.31.251:8088' + uri
}
/**
* 初始化适用于express的router
* @param {string} uri
*/
initExpressRouter(uri) {
// authentication
this.router.use(uri, async (req, res, next) => {
res.status(200)
const { url, method, body } = req
log.trace('%s %s %o', method, url, omitNil(pick(['jobId', 'executorHandler', 'executorParams', 'executorTimeout', 'logId', 'logDateTime'], body)))
const token = path(['headers', 'xxl-job-access-token'], req)
if (!!this.accessToken && this.accessToken !== token) {
res.send({ code: 500, msg: 'access token incorrect' })
return
}
if (!propOr(false, 'body', req)) {
res.send({ code: 500, msg: 'need apply body-parser middleware first' })
return
}
await next()
})
this.addRoutes(uri)
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {string} baseUri
*/
addRoutes(baseUri) {
// detect whether the executor is online
this.router.post(`${baseUri}/beat`, async (...contexts) => {
const { res } = this.wrappedHandler(contexts)
res.send(this.beat())
})
// check whether is already have the same job is running
this.router.post(`${baseUri}/idleBeat`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
const jobId = pathOr(-1, ['body', 'jobId'], req)
res.send(this.idleBeat(jobId))
})
// trigger job
this.router.post(`${baseUri}/run`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
res.send(this.run(propOr({}, 'body', req)))
})
// kill job
this.router.post(`${baseUri}/kill`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
res.send(this.killJob(pathOr(-1, ['body', 'jobId'], req)))
})
// view job's execution log
this.router.post(`${baseUri}/log`, async (...contexts) => {
const { req, res } = this.wrappedHandler(contexts)
const { logDateTim: logDateTime, logId, fromLineNum } = propOr({}, 'body', req)
const data = await this.readLog(logDateTime, logId, fromLineNum)
res.send(data)
})
}
/**
* 将koa和express的request body处理成相同的结构,方便后边router处理
* @param {any} contexts
* @return {Object}
*/
wrappedHandler(contexts) {
switch (this.appType) {
case 'EXPRESS': {
const [req, res] = contexts
return { req, res }
}
case 'KOA': {
const [ctx] = contexts
return { req: propOr({}, 'request', ctx), res: { send: (body) => ctx.body = body } }
}
}
}
/**
* 心跳检测:调度中心检测执行器是否在线时使用
* @return {{code: number, msg: string}}
*/
beat() {
return { code: 200, msg: 'success' }
}
/**
* 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
* @param {string} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
idleBeat(jobId) {
return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' })
}
/**
* 触发任务执行
* @param {number} jobId - 任务ID
* @param {string} handlerName - 任务的handler名字
* @param {string} jobJsonParams - 任务参数
* @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
* @param {number} logId - 本次调度日志ID
* @param {number} - 本次调度日志时间
* @return {{code: number, msg: string}}
*/
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
// check executorHandler
const jobHandler = this.jobHandlers.get(handlerName)
if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` }
}
// execute job
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this))
return { code: 200, msg: 'success' }
}
/**
* 终止任务
* @param {number} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
killJob(jobId) {
return { code: 500, msg: `not yet support, jobId(${jobId})` }
}
/**
* 查看执行日志
* @param {number} logDateTime - 本次调度日志时间
* @param {number} logId - 本次调度日志ID
* @param {number} fromLineNum - 日志开始行号
* @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
*/
async readLog(logDateTime, logId, fromLineNum) {
let logContent
let toLineNum
try {
const lines = await this.jobManager.readJobLog(logDateTime, logId)
lines.splice(0, fromLineNum - 1)
if (last(lines) === '') lines.pop()
toLineNum = fromLineNum + lines.length - 1
lines.unshift('')
logContent = lines.join('\n')
} catch (err) {
log.err('readLog error: %o', err.message || err)
toLineNum = fromLineNum
logContent = err.toString()
}
return { code: 200, content: { fromLineNum, toLineNum, logContent } }
}
/**
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/
async registry() {
const url = `${this.scheduleCenterUrl}/api/registry`
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl }
const headers = { 'xxl-job-access-token': this.accessToken }
await xxlPostTask({ url, data, config: { headers } })
.chain(tapTask((response) => log.trace('registry %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry error: %o', err.message || err)
return Task.of()
})
.run().promise()
}
/**
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/
async registryRemove() {
const url = `${this.scheduleCenterUrl}/api/registryRemove`
const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl }
const headers = { 'xxl-job-access-token': this.accessToken }
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('registry remove %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
.orElse((err) => {
log.err('registry remove error: %o', err.message || err)
return Task.of()
}).run().promise()
}
/**
* 任务回调:执行器执行完任务后,回调任务结果时使用
* @param {*} error
* @param {{logId: number, result: any}} jobResult
*/
async callback(error, { logId, result }) {
const url = `${this.scheduleCenterUrl}/api/callback`
const headers = { 'xxl-job-access-token': this.accessToken }
const handleCode = error ? 500 : 200
const handleMsg = error ? error.message || error.toString() : (result ? JSON.stringify(result) : 'success')
const data = [{ logId, logDateTim: Date.now(), handleCode, handleMsg }]
await Task.of({ url, data, config: { headers } })
.chain(xxlPostTask)
.chain(tapTask((response) => log.trace('callback %o ==> %o', data[0], omitNil(propOr({}, 'data', response)))))
.orElse(tapTask((err) => log.err('callback error: %o', err.message || err)))
.run().promise()
}
}
module.exports = Executor
const { once } = require('events')
const fs = require('fs')
const readline = require('readline')
const mkdir = (path) => !fs.existsSync(path) && fs.mkdirSync(path, { recursive: true })
const searchInFile = async (filePath, content, endContent) => {
const rl = readline.createInterface({ input: fs.createReadStream(filePath), crlfDelay: Infinity })
const reg = content ? new RegExp(content, 'i') : undefined
const endReg = endContent ? new RegExp(endContent, 'i') : undefined
const lines = []
let stop = false
rl.on('line', (line) => {
if (stop) {
rl.close()
return
}
reg && reg.test(line) && lines.push(line)
endReg && endReg.test(line) && (stop = true) && rl.close()
})
await once(rl, 'close')
return lines
}
module.exports = {
mkdir,
searchInFile,
}
import * as Executor from './executor';
const { isNilOrEmpty, notEmpty } = require('./src/purefuncs')
/**
* XxlJobExecutor
*/
class XxlJobExecutor {
executor;
/**
* 创建 XxlJobExecutor 实例
* @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数
* @param {Object} context 为所有任务执行函数指定公共的上下文对象,常见比如数据库实例 { database, redis }
*/
constructor(opts, jobHandlers) {
const {
XXL_JOB_EXECUTOR_KEY: executorKey,
XXL_JOB_SCHEDULE_CENTER_URL: scheduleCenterUrl,
XXL_JOB_ACCESS_TOKEN: accessToken,
XXL_JOB_JOB_LOG_PATH: jobLogPath,
} = opts;
const parameters = { executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers }
// const invalidParameters = Object.entries(parameters).filter(([, value]) => isNilOrEmpty(value))
// if (notEmpty(invalidParameters)) throw `invalid parameter: ${invalidParameters.map(([key]) => key).join(',')}`
this.executor = new Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers)
}
/**
* 应用执行器组件
* @param {Object} args
* @param {any} args.app 执行器server, express
* @param {string} args.appDomain 执行器 server 地址,eg: http://server-api.com
* @param {string} args.path 执行器挂载的 uri 路径,eg: /job
*/
applyMiddleware({ app, appDomain, path: uri }) {
this.executor.applyMiddleware({ app, appDomain, uri })
const registry = this.executor.registry.bind(this.executor)
registry() && setInterval(registry, 30000)
}
/**
* 关闭服务前应调用该方法,将执行器从调度中心摘除
*/
async close() {
await this.executor.registryRemove()
}
}
\ No newline at end of file
const fs = require('fs')
const moment = require('moment')
const Path = require('path')
const logger = require('./logger')
const { Task, tapTask } = require('./purefuncs')
const { mkdir, searchInFile } = require('./file')
const log = logger('xxl-job-manager')
/**
* 任务管理
*/
class JobManager {
/**
* @param {string} jobLogPath
* @param {*} context
*/
constructor(jobLogPath, context) {
mkdir(jobLogPath)
this.jobLogPath = jobLogPath
this.context = context
this.runningJobs = new Set()
}
/**
* 根据调度时间获取日志文件路径
* @param {number} dateTime
* @return {string}
*/
getLogFilePath(dateTime) {
return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`)
}
/**
* 构造任务logger的namespace
* @param {string} handlerName
* @param {number} dateTime
* @param {number} logId
* @return {string}
*/
getJobLoggerNamespace(handlerName, dateTime, logId) {
return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`
}
/**
* @param {{number}} jobId
* @return {boolean}
*/
hasJob(jobId) {
return this.runningJobs.has(jobId)
}
/**
* @param {number} jobId
* @param {string} jobJsonParams
* @param {number} logId
* @param {number} logDateTime
* @param {number} executorTimeout
* @param {string} handlerName
* @param {function} jobHandler
* @param {function} callback
*/
runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, callback) {
let timeout = undefined
const logNameSpace = this.getJobLoggerNamespace(handlerName, logDateTime, logId)
const logFilePath = this.getLogFilePath(logDateTime)
const jobLogger = logger(logNameSpace, logFilePath)
Task.of(jobJsonParams)
.chain((jobJsonParams) => Task.of(jobJsonParams ? JSON.parse(jobJsonParams) : {}))
.chain((jobParams) => {
jobLogger.trace('start')
// check duplicate job
if (this.hasJob(jobId)) return Task.rejected('There is already have a same job is running')
this.runningJobs.add(jobId)
// setup timeout
if (executorTimeout) {
timeout = setTimeout(
async () => await this.finishJob({ jobId, logId, jobLogger, callback, timeout, error: new Error('timeout') }),
executorTimeout * 1000)
}
return Task.fromPromised(jobHandler)(jobLogger, jobParams, this.context)
})
.chain((result) => Task.of({ result }))
.orElse((error) => Task.of({ error }))
.chain(tapTask(async ({ result, error }) => await this.finishJob({ jobId, logId, jobLogger, callback, timeout, result, error })))
.run().promise()
}
/**
* @param {number} logDateTime
* @param {number} logId
* @return {Promise<Array>}
*/
async readJobLog(logDateTime, logId) {
const logFilePath = this.getLogFilePath(logDateTime)
const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' '
return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : []
}
/**
* @param {number} jobId
* @param {number} logId
* @param {*} jobLogger
* @param {function} callback
* @param {number} timeout
* @param {*} result
* @param {*} error
* @return {Promise<void>}
*/
async finishJob({ jobId, logId, jobLogger, callback, timeout, result, error }) {
try {
timeout && clearTimeout(timeout)
result && jobLogger.trace('result: %o', result)
error && jobLogger.err('error: %o', error.message || error)
jobLogger.trace('end')
jobLogger.close()
await callback(error, { logId, result })
} catch (err) {
log.err('finishJob error: %o', err.message || err)
}
this.runningJobs.delete(jobId)
}
}
module.exports = JobManager
const debug = require('debug')
const fs = require('fs')
const os = require('os')
const util = require('util')
const { always, propOr } = require('./purefuncs')
const enableExecutorDebugLog = /^(yes|on|true|enable|enabled|1)$/i.test(`${propOr(false, 'XXL_JOB_DEBUG_LOG', process.env)}`)
const enableLogLevels = propOr('info:*,warn:*,error:*,debug:*,trace:*', 'DEBUG', process.env)
const writeStreamOptions = { flags: 'a', encoding: 'utf8', autoClose: true, emitClose: true }
const noop = always(undefined)
const noopLogger = { info: noop, err: noop, debug: noop, warn: noop, trace: noop }
const dErr = debug('error')
const dInfo = debug('info')
const dWarn = debug('warn')
const dDebug = debug('debug')
const dTrace = debug('trace')
// 自定义对象,包装 debug 模拟日志级别
const createLogger = (ns) => {
const logger = {
info: dInfo.extend(ns),
err: dErr.extend(ns),
debug: dDebug.extend(ns),
warn: dWarn.extend(ns),
trace: dTrace.extend(ns),
}
Object.values(logger).forEach((levelLogger) => Object.assign(levelLogger, { enabled: true, useColors: false }))
return logger
}
module.exports = (ns, logFilePath) => {
// 1. 执行器运行日志,输出到 stderr,限制日志级别
if (!logFilePath) {
if (!enableExecutorDebugLog) return noopLogger
const logger = createLogger(ns)
Object.entries(logger).forEach(([level, levelLogger]) => levelLogger.enabled = enableLogLevels.includes(level))
return logger
}
// 2. 任务执行日志,同时输出到 stderr 和 文件,stderr 限制日志级别,输出到文件不限制级别以供调度中心全量查看
const writeStream = fs.createWriteStream(logFilePath, writeStreamOptions)
const log2File = (...args) => writeStream.write(`${util.format(...args)}${os.EOL}`)
const log2Stderr = (...args) => console.error(util.format(...args))
const log2FileAndStderr = (...args) => {
const content = util.format(...args)
writeStream.write(`${content}${os.EOL}`)
console.error(content)
}
const logger = createLogger(ns)
// 设置输出
Object.entries(logger).forEach(([level, levelLogger]) => {
levelLogger.log = enableLogLevels.includes(level) ? log2FileAndStderr : log2File
})
// 任务执行完成,关闭文件输出流,后续日志只输出到 stderr
logger.close = () => {
Object.entries(logger).forEach(([level, levelLogger]) => {
Object.assign(levelLogger, { enabled: enableLogLevels.includes(level), log: log2Stderr })
})
writeStream.end()
}
return logger
}
const Axios = require('axios')
const FC = require('folktale/concurrency')
const R = require('ramda')
const always = R.always
const anyPass = R.anyPass
const last = R.last
const compose = R.compose
const pick = R.pick
const propOr = R.propOr
const path = R.path
const pathOr = R.pathOr
const reject = R.reject
const tap = R.tap
const not = R.not
const isNil = R.isNil
const isEmpty = R.isEmpty
const Task = FC.task
const notEmpty = compose(not, isEmpty)
const omitNil = reject(isNil)
const isNilOrEmpty = anyPass([isNil, isEmpty])
const tapTask = (f) => compose(Task.of, tap(f))
const postTask = Task.fromPromised(Axios.post)
module.exports = {
always, last, compose, pick, propOr, path, pathOr, tap, not, isNil, isEmpty, Task, notEmpty, omitNil,
isNilOrEmpty, tapTask, postTask,
}
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!