executor.js 8.7 KB
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Executor = void 0;
const job_manager_1 = require("./job_manager");
// const logger = require('./logger')
// const log = logger('xxl-job-executor')
const axios_1 = require("axios");
class Executor {
    executorKey;
    scheduleCenterUrl;
    accessToken;
    executorUrl;
    jobHandlers;
    jobManager;
    /**
     * @param {string} executorKey
     * @param {string} scheduleCenterUrl
     * @param {string} accessToken
     * @param {string} jobLogPath
     * @param {Map} jobHandlers
     * @param {*} context
     */
    constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context) {
        this.executorKey = executorKey;
        this.scheduleCenterUrl = scheduleCenterUrl;
        this.accessToken = accessToken;
        this.jobHandlers = jobHandlers;
        this.jobManager = new job_manager_1.JobManager(jobLogPath, context);
    }
    /**
     * 应用执行器中间件
     * @param {*} app
     * @param {string} domain
     */
    applyMiddleware({ app, domain }) {
        this.executorUrl = domain;
        const express = require('express');
        var router = new express.Router();
        //请求认证
        router.use(async (req, res, next) => {
            res.status(200);
            const token = req.headers && req.headers['xxl-job-access-token'];
            if (!!this.accessToken && this.accessToken !== token) {
                res.send({ code: 500, msg: 'access token incorrect' });
                return;
            }
            if (!req.body) {
                res.send({ code: 500, msg: 'body is null' });
                return;
            }
            next();
        });
        this.addJobRoutes(router);
        app.use(router);
    }
    /**
     * 添加xxl-job相关的路由,供调度中心访问
     * @param {express.Router} router
     * @param {string} baseUri
     */
    addJobRoutes(router) {
        router.post(`/beat`, async (req, res, next) => {
            res.send(this.beat());
        });
        router.post(`/idleBeat`, async (req, res, next) => {
            res.send(this.idleBeat(req.body.jobId || -1));
        });
        router.post(`/run`, async (req, res, next) => {
            res.send(this.run(req.body || {}));
        });
        router.post(`/kill`, async (req, res, next) => {
            res.send(this.killJob(req.body.jobId || -1));
        });
        router.post(`/log`, async (req, res, next) => {
            const { logDateTim, logId, fromLineNum } = req.body || {};
            const data = await this.readLog(logDateTim, logId, fromLineNum);
            res.send(data);
        });
    }
    /**
     * 心跳检测:调度中心检测执行器是否在线时使用
     * @return {{code: number, msg: string}}
     */
    beat() {
        return { code: 200, msg: 'success' };
    }
    /**
     * 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
     * @param {string} jobId - 任务ID
     * @return {{code: number, msg: string}}
     */
    idleBeat(jobId) {
        return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' });
    }
    /**
     * 触发任务执行
     * @param {number} jobId - 任务ID
     * @param {string} handlerName - 任务的handler名字
     * @param {string} jobJsonParams - 任务参数
     * @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
     * @param {number} logId - 本次调度日志ID
     * @param {number} - 本次调度日志时间
     * @return {{code: number, msg: string}}
     */
    run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
        const jobHandler = this.jobHandlers.get(handlerName);
        if (!jobHandler) {
            return { code: 500, msg: `no matched jobHandler(${handlerName})` };
        }
        this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
        return { code: 200, msg: 'success' };
    }
    /**
     * 终止任务
     * @param {number} jobId - 任务ID
     * @return {{code: number, msg: string}}
     */
    killJob(jobId) {
        return { code: 500, msg: `not yet support, jobId(${jobId})` };
    }
    /**
     * 查看执行日志
     * @param {number} logDateTime - 本次调度日志时间
     * @param {number} logId - 本次调度日志ID
     * @param {number} fromLineNum - 日志开始行号
     * @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
     */
    async readLog(logDateTime, logId, fromLineNum) {
        //待实现
        return {
            code: 200, content: {
                "fromLineNum": 0,
                "toLineNum": 100,
                "logContent": "test",
                "isEnd": true // 日志是否全部加载完
            }
        };
        //let logContent
        //let toLineNum
        // try {
        //   const lines = await this.jobManager.readJobLog(logDateTime, logId)
        //   lines.splice(0, fromLineNum - 1)
        //   if (last(lines) === '') lines.pop()
        //   toLineNum = fromLineNum + lines.length - 1
        //   lines.unshift('')
        //   logContent = lines.join('\n')
        // } catch (err) {
        //   log.err('readLog error: %o', err.message)
        //   toLineNum = fromLineNum
        //   logContent = err.toString()
        // }
        //       请求数据格式如下,放置在 RequestBody 中,JSON格式:
        //     {
        //         "logDateTim":0,     // 本次调度日志时间
        //         "logId":0,          // 本次调度日志ID
        //         "fromLineNum":0     // 日志开始行号,滚动加载日志
        //     }
        // 响应数据格式:
        //     {
        //         "code":200,         // 200 表示正常、其他失败
        //         "msg": null         // 错误提示消息
        //         "content":{
        //             "fromLineNum":0,        // 本次请求,日志开始行数
        //             "toLineNum":100,        // 本次请求,日志结束行号
        //             "logContent":"xxx",     // 本次请求日志内容
        //             "isEnd":true            // 日志是否全部加载完
        //         }
        //     }
        // return { code: 200, content: { fromLineNum, toLineNum, logContent } }
    }
    /**
     * 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
     */
    async registry() {
        return axios_1.default.post(`${this.scheduleCenterUrl}/api/registry`, {
            registryGroup: 'EXECUTOR',
            registryKey: this.executorKey,
            registryValue: this.executorUrl
        }, {
            headers: {
                'XXL-JOB-ACCESS-TOKEN': this.accessToken,
                'Content-Type': 'application/json'
            }
        }).then((response) => console.log('执行器注册成功:', response.data)).catch(error => console.error('执行器注册失败:', error.message));
    }
    /**
     * 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
     */
    async registryRemove() {
        return axios_1.default.post(`${this.scheduleCenterUrl}/api/registryRemove`, {
            'registryGroup': 'EXECUTOR',
            'registryKey': this.executorKey,
            'registryValue': this.executorUrl
        }, {
            headers: {
                'XXL-JOB-ACCESS-TOKEN': this.accessToken,
                'Content-Type': 'application/json'
            }
        }).then((response) => console.log('执行器移除成功:', response.data)).catch(error => console.error('执行器移除失败:', error.message));
    }
    /**
   * 任务回调:执行器执行完任务后,回调任务结果时使用
   * @param {Error|null} error 错误对象
   * @param {{logId: number, result: any}} jobResult 任务结果
   */
    async callback(error, { logId, result }) {
        return axios_1.default.post(`${this.scheduleCenterUrl}/api/callback`, [{
                logId,
                logDateTim: Date.now(),
                handleCode: error ? 500 : 200,
                handleMsg: error ? error.message : (result ? JSON.stringify(result) : 'success')
            }], {
            headers: {
                'XXL-JOB-ACCESS-TOKEN': this.accessToken,
                'Content-Type': 'application/json'
            }
        }).then((response) => response.data).catch((err) => {
            console.error(`callback error:${JSON.stringify(result)} ${err.message}`);
        });
    }
}
exports.Executor = Executor;
//# sourceMappingURL=executor.js.map