executor.js 9.7 KB
const { last, path, pathOr, pick, omitNil, propOr, postTask, Task, tapTask } = require('./purefuncs');
const JobManager = require('./job-manager');
const logger = require('./logger');
const log = logger('xxl-job-executor');
const xxlPostTask = ({ url, data, config }) => postTask(url, data, config);
class Executor {
    /**
     * @param {string} executorKey
     * @param {string} scheduleCenterUrl
     * @param {string} accessToken
     * @param {string} jobLogPath
     * @param {Map} jobHandlers
     * @param {*} context
     */
    constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context) {
        this.executorKey = executorKey;
        this.scheduleCenterUrl = scheduleCenterUrl;
        this.accessToken = accessToken;
        this.jobHandlers = jobHandlers;
        this.jobManager = new JobManager(jobLogPath, context);
    }
    /**
     * 应用执行器中间件
     * @param {*} app
     * @param {string} appType
     * @param {string} appDomain
     * @param {string} uri
     */
    applyMiddleware({ app, appType, appDomain, uri }) {
        switch (appType) {
            case 'EXPRESS': {
                const Express = require('express');
                const Router = Express.Router;
                this.router = new Router();
                this.initExpressRouter(uri);
                app.use(this.router);
                break;
            }
            default:
                throw 'unsupported appType, just support express or koa';
        }
        this.appType = appType;
        this.executorUrl = 'http://192.168.31.251:8088' + uri;
    }
    /**
     * 初始化适用于express的router
     * @param {string} uri
     */
    initExpressRouter(uri) {
        // authentication
        this.router.use(uri, async (req, res, next) => {
            res.status(200);
            const { url, method, body } = req;
            log.trace('%s %s %o', method, url, omitNil(pick(['jobId', 'executorHandler', 'executorParams', 'executorTimeout', 'logId', 'logDateTime'], body)));
            const token = path(['headers', 'xxl-job-access-token'], req);
            if (!!this.accessToken && this.accessToken !== token) {
                res.send({ code: 500, msg: 'access token incorrect' });
                return;
            }
            if (!propOr(false, 'body', req)) {
                res.send({ code: 500, msg: 'need apply body-parser middleware first' });
                return;
            }
            await next();
        });
        this.addRoutes(uri);
    }
    /**
     * 添加xxl-job相关的路由,供调度中心访问
     * @param {string} baseUri
     */
    addRoutes(baseUri) {
        // detect whether the executor is online
        this.router.post(`${baseUri}/beat`, async (...contexts) => {
            const { res } = this.wrappedHandler(contexts);
            res.send(this.beat());
        });
        // check whether is already have the same job is running
        this.router.post(`${baseUri}/idleBeat`, async (...contexts) => {
            const { req, res } = this.wrappedHandler(contexts);
            const jobId = pathOr(-1, ['body', 'jobId'], req);
            res.send(this.idleBeat(jobId));
        });
        // trigger job
        this.router.post(`${baseUri}/run`, async (...contexts) => {
            const { req, res } = this.wrappedHandler(contexts);
            res.send(this.run(propOr({}, 'body', req)));
        });
        // kill job
        this.router.post(`${baseUri}/kill`, async (...contexts) => {
            const { req, res } = this.wrappedHandler(contexts);
            res.send(this.killJob(pathOr(-1, ['body', 'jobId'], req)));
        });
        // view job's execution log
        this.router.post(`${baseUri}/log`, async (...contexts) => {
            const { req, res } = this.wrappedHandler(contexts);
            const { logDateTim: logDateTime, logId, fromLineNum } = propOr({}, 'body', req);
            const data = await this.readLog(logDateTime, logId, fromLineNum);
            res.send(data);
        });
    }
    /**
     * 将koa和express的request body处理成相同的结构,方便后边router处理
     * @param {any} contexts
     * @return {Object}
     */
    wrappedHandler(contexts) {
        switch (this.appType) {
            case 'EXPRESS': {
                const [req, res] = contexts;
                return { req, res };
            }
            case 'KOA': {
                const [ctx] = contexts;
                return { req: propOr({}, 'request', ctx), res: { send: (body) => ctx.body = body } };
            }
        }
    }
    /**
     * 心跳检测:调度中心检测执行器是否在线时使用
     * @return {{code: number, msg: string}}
     */
    beat() {
        return { code: 200, msg: 'success' };
    }
    /**
     * 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
     * @param {string} jobId - 任务ID
     * @return {{code: number, msg: string}}
     */
    idleBeat(jobId) {
        return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' });
    }
    /**
     * 触发任务执行
     * @param {number} jobId - 任务ID
     * @param {string} handlerName - 任务的handler名字
     * @param {string} jobJsonParams - 任务参数
     * @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
     * @param {number} logId - 本次调度日志ID
     * @param {number} - 本次调度日志时间
     * @return {{code: number, msg: string}}
     */
    run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
        // check executorHandler
        const jobHandler = this.jobHandlers.get(handlerName);
        if (!jobHandler) {
            return { code: 500, msg: `no matched jobHandler(${handlerName})` };
        }
        // execute job
        this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
        return { code: 200, msg: 'success' };
    }
    /**
     * 终止任务
     * @param {number} jobId - 任务ID
     * @return {{code: number, msg: string}}
     */
    killJob(jobId) {
        return { code: 500, msg: `not yet support, jobId(${jobId})` };
    }
    /**
     * 查看执行日志
     * @param {number} logDateTime - 本次调度日志时间
     * @param {number} logId - 本次调度日志ID
     * @param {number} fromLineNum - 日志开始行号
     * @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
     */
    async readLog(logDateTime, logId, fromLineNum) {
        let logContent;
        let toLineNum;
        try {
            const lines = await this.jobManager.readJobLog(logDateTime, logId);
            lines.splice(0, fromLineNum - 1);
            if (last(lines) === '')
                lines.pop();
            toLineNum = fromLineNum + lines.length - 1;
            lines.unshift('');
            logContent = lines.join('\n');
        }
        catch (err) {
            log.err('readLog error: %o', err.message || err);
            toLineNum = fromLineNum;
            logContent = err.toString();
        }
        return { code: 200, content: { fromLineNum, toLineNum, logContent } };
    }
    /**
     * 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
     */
    async registry() {
        const url = `${this.scheduleCenterUrl}/api/registry`;
        const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl };
        const headers = { 'xxl-job-access-token': this.accessToken };
        await xxlPostTask({ url, data, config: { headers } })
            .chain(tapTask((response) => log.trace('registry %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
            .orElse((err) => {
            log.err('registry error: %o', err.message || err);
            return Task.of();
        })
            .run().promise();
    }
    /**
     * 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
     */
    async registryRemove() {
        const url = `${this.scheduleCenterUrl}/api/registryRemove`;
        const data = { 'registryGroup': 'EXECUTOR', 'registryKey': this.executorKey, 'registryValue': this.executorUrl };
        const headers = { 'xxl-job-access-token': this.accessToken };
        await Task.of({ url, data, config: { headers } })
            .chain(xxlPostTask)
            .chain(tapTask((response) => log.trace('registry remove %o ==> %o', data, omitNil(propOr({}, 'data', response)))))
            .orElse((err) => {
            log.err('registry remove error: %o', err.message || err);
            return Task.of();
        }).run().promise();
    }
    /**
     * 任务回调:执行器执行完任务后,回调任务结果时使用
     * @param {*} error
     * @param {{logId: number, result: any}} jobResult
     */
    async callback(error, { logId, result }) {
        const url = `${this.scheduleCenterUrl}/api/callback`;
        const headers = { 'xxl-job-access-token': this.accessToken };
        const handleCode = error ? 500 : 200;
        const handleMsg = error ? error.message || error.toString() : (result ? JSON.stringify(result) : 'success');
        const data = [{ logId, logDateTim: Date.now(), handleCode, handleMsg }];
        await Task.of({ url, data, config: { headers } })
            .chain(xxlPostTask)
            .chain(tapTask((response) => log.trace('callback %o ==> %o', data[0], omitNil(propOr({}, 'data', response)))))
            .orElse(tapTask((err) => log.err('callback error: %o', err.message || err)))
            .run().promise();
    }
}
module.exports = Executor;
//# sourceMappingURL=executor.js.map