executor.js 6.5 KB
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Executor = void 0;
const job_manager_1 = require("./job-manager");
const axios_1 = require("axios");
class Executor {
    executorKey;
    scheduleCenterUrl;
    accessToken;
    executorUrl;
    jobHandlers;
    jobManager;
    constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers) {
        this.executorKey = executorKey;
        this.scheduleCenterUrl = scheduleCenterUrl;
        this.accessToken = accessToken;
        this.jobHandlers = jobHandlers;
        this.jobManager = new job_manager_1.JobManager(jobLogPath);
    }
    applyMiddleware({ appType, domain }) {
        this.executorUrl = domain;
        if (appType === 'express') {
            return this.applyExpressMiddleware();
        }
        else if (appType === 'koa') {
            return this.applyKoaMiddleware();
        }
        else {
            throw new Error(`Unsupported appType: ${appType}`);
        }
    }
    applyExpressMiddleware() {
        const express = require('express');
        const app = express().use(require('body-parser').json());
        const router = express.Router();
        router.use(async (req, res, next) => {
            res.status(200);
            const token = req.headers && req.headers['xxl-job-access-token'];
            if (!!this.accessToken && this.accessToken !== token) {
                res.send({ code: 500, msg: 'access token incorrect' });
                return;
            }
            if (!req.body) {
                res.send({ code: 500, msg: 'body is null' });
                return;
            }
            next();
        });
        this.addJobRoutes(router, 'express');
        app.use(router);
        return app;
    }
    applyKoaMiddleware() {
        const Koa = require('koa');
        const app = new Koa();
        app.use(require('koa-bodyparser')());
        const Router = require('koa-router');
        const router = new Router();
        const bodyParser = require('koa-bodyparser');
        router.use(async (ctx, next) => {
            ctx.status = 200;
            const token = ctx.headers && ctx.headers['xxl-job-access-token'];
            if (!!this.accessToken && this.accessToken !== token) {
                ctx.body = { code: 500, msg: 'access token incorrect' };
                return;
            }
            if (!ctx.request.body) {
                ctx.body = { code: 500, msg: 'body is null' };
                return;
            }
            await next();
        });
        this.addJobRoutes(router, 'koa');
        app.use(bodyParser());
        app.use(router.routes()).use(router.allowedMethods());
        return app;
    }
    addJobRoutes(router, appType) {
        const handleRoute = (path, handler) => {
            if (appType === 'express') {
                router.post(path, async (req, res) => {
                    const result = handler(req.body || {});
                    res.send(result);
                });
            }
            else if (appType === 'koa') {
                router.post(path, async (ctx) => {
                    const result = handler(ctx.request.body || {});
                    ctx.body = result;
                });
            }
        };
        handleRoute('/beat', () => this.beat());
        handleRoute('/idleBeat', (body) => this.idleBeat(body.jobId || -1));
        handleRoute('/run', (body) => this.run(body || {}));
        handleRoute('/kill', (body) => this.killJob(body.jobId || -1));
        handleRoute('/log', async (body) => {
            const { logDateTim, logId, fromLineNum } = body || {};
            return await this.readLog(logDateTim, logId, fromLineNum);
        });
    }
    beat() {
        return { code: 200, msg: 'success' };
    }
    idleBeat(jobId) {
        return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' });
    }
    run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
        const jobHandler = this.jobHandlers.get(handlerName);
        if (!jobHandler) {
            return { code: 500, msg: `no matched jobHandler(${handlerName})` };
        }
        this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
        return { code: 200, msg: 'success' };
    }
    killJob(jobId) {
        return { code: 500, msg: `not yet support, jobId(${jobId})` };
    }
    async readLog(logDateTime, logId, fromLineNum) {
        return {
            code: 200, content: {
                "fromLineNum": 0,
                "toLineNum": 100,
                "logContent": "test",
                "isEnd": true
            }
        };
    }
    async registry() {
        return axios_1.default.post(`${this.scheduleCenterUrl}/api/registry`, {
            registryGroup: 'EXECUTOR',
            registryKey: this.executorKey,
            registryValue: this.executorUrl
        }, {
            headers: {
                'XXL-JOB-ACCESS-TOKEN': this.accessToken,
                'Content-Type': 'application/json'
            }
        }).then((response) => {
        }).catch(error => console.error('执行器注册失败:', error.message));
    }
    async registryRemove() {
        return axios_1.default.post(`${this.scheduleCenterUrl}/api/registryRemove`, {
            'registryGroup': 'EXECUTOR',
            'registryKey': this.executorKey,
            'registryValue': this.executorUrl
        }, {
            headers: {
                'XXL-JOB-ACCESS-TOKEN': this.accessToken,
                'Content-Type': 'application/json'
            }
        }).then((response) => console.log('执行器移除成功:', response.data)).catch(error => console.error('执行器移除失败:', error.message));
    }
    async callback({ error, logId, result, use_time }) {
        var handleMsg = JSON.stringify({ use_time, result, error: error && error.message });
        return axios_1.default.post(`${this.scheduleCenterUrl}/api/callback`, [{
                logId,
                logDateTim: Date.now(),
                handleCode: error ? 500 : 200,
                handleMsg
            }], {
            headers: {
                'XXL-JOB-ACCESS-TOKEN': this.accessToken,
                'Content-Type': 'application/json'
            }
        }).then((response) => response.data).catch((err) => {
            console.error(`callback error:${JSON.stringify(result)} ${err.message}`);
        });
    }
}
exports.Executor = Executor;
//# sourceMappingURL=executor.js.map