job_manager.js 4.6 KB
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.JobManager = void 0;
const moment = require("moment");
const Path = require("path");
// const Path = require('path')
// const logger = require('./logger')
// const log = logger('xxl-job-manager')
// const { Task, tapTask } = require('./purefuncs')
// const { mkdir, searchInFile } = require('./file')
/**
 * 任务管理
 */
class JobManager {
    context;
    runningJobs;
    jobLogPath;
    /**
     * @param {string} jobLogPath
     * @param {*} context
     */
    constructor(jobLogPath, context) {
        // mkdir(jobLogPath)
        this.jobLogPath = jobLogPath;
        this.context = context;
        this.runningJobs = new Set();
    }
    /**
     * 根据调度时间获取日志文件路径
     * @param {number} dateTime
     * @return {string}
     */
    getLogFilePath(dateTime) {
        return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`);
    }
    /**
     * 构造任务logger的namespace
     * @param {string} handlerName
     * @param {number} dateTime
     * @param {number} logId
     * @return {string}
     */
    getJobLoggerNamespace(handlerName, dateTime, logId) {
        return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`;
    }
    /**
     * @param {{number}} jobId
     * @return {boolean}
     */
    hasJob(jobId) {
        return this.runningJobs.has(jobId);
    }
    async runJob(jobId, // 任务ID
    jobJsonParams, // 任务参数(JSON格式)
    logId, // 日志ID
    logDateTime, // 日志时间
    executorTimeout, // 执行超时时间(秒)
    handlerName, // 处理器名称
    jobHandler, // 任务处理函数
    callback // 回调函数
    ) {
        // 1. 初始化日志记录器
        // const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
        // const logFilePath = this.getLogFilePath(logDateTime);
        // const jobLogger = logger(loggerNamespace, logFilePath);
        let result, timeoutTimer, error;
        let use_time = new Date().getTime();
        try {
            if (this.hasJob(jobId)) {
                throw new Error('已有相同任务正在运行');
            }
            this.runningJobs.add(jobId);
            let jobParams;
            try {
                jobParams = JSON.parse(jobJsonParams);
            }
            catch {
                jobJsonParams = {};
            }
            if (executorTimeout) {
                timeoutTimer = setTimeout(() => this.finishJob({
                    jobId,
                    logId,
                    callback,
                    timeoutTimer,
                    use_time: new Date().getTime() - use_time,
                    result: null,
                    error: new Error('任务执行超时')
                }), executorTimeout * 1000);
            }
            result = await jobHandler(jobParams, this.context);
            if (result && result.err) {
                error = result.err;
                delete result.err;
                if (Object.keys(result).length === 0) {
                    result = null;
                }
            }
        }
        catch (err) {
            error = err;
        }
        await this.finishJob({
            jobId,
            logId,
            callback,
            timeoutTimer,
            use_time: new Date().getTime() - use_time,
            result,
            error
        });
    }
    /**
     * @param {number} logDateTime
     * @param {number} logId
     * @return {Promise<Array>}
     */
    async readJobLog(logDateTime, logId) {
        const logFilePath = this.getLogFilePath(logDateTime);
        const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' ';
        return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : [];
    }
    /**
     * @param {number} jobId
     * @param {number} logId
     * @param {*} jobLogger
     * @param {function} callback
     * @param {number} timeoutTimer
     * @param {number} use_time
     * @param {*} result
     * @param {*} error
     * @return {Promise<void>}
     */
    async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
        try {
            timeoutTimer && clearTimeout(timeoutTimer);
            await callback(error, { logId, use_time, result });
        }
        catch (err) {
            console.log(`finishJob error: ${err.message}`);
        }
        finally {
            this.runningJobs.delete(jobId);
        }
    }
}
exports.JobManager = JobManager;
//# sourceMappingURL=job_manager.js.map