job_manager.js 3.1 KB
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.JobManager = void 0;
const moment = require("moment");
const Path = require("path");
class JobManager {
    context;
    runningJobs;
    jobLogPath;
    constructor(jobLogPath, context) {
        this.jobLogPath = jobLogPath;
        this.context = context;
        this.runningJobs = new Set();
    }
    getLogFilePath(dateTime) {
        return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`);
    }
    getJobLoggerNamespace(handlerName, dateTime, logId) {
        return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`;
    }
    hasJob(jobId) {
        return this.runningJobs.has(jobId);
    }
    async runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, callback) {
        let result, timeoutTimer, error;
        let use_time = new Date().getTime();
        try {
            if (this.hasJob(jobId)) {
                throw new Error('已有相同任务正在运行');
            }
            this.runningJobs.add(jobId);
            let jobParams;
            try {
                jobParams = JSON.parse(jobJsonParams);
            }
            catch {
                jobJsonParams = {};
            }
            if (executorTimeout) {
                timeoutTimer = setTimeout(() => this.finishJob({
                    jobId,
                    logId,
                    callback,
                    timeoutTimer,
                    use_time: new Date().getTime() - use_time,
                    result: null,
                    error: new Error('任务执行超时')
                }), executorTimeout * 1000);
            }
            result = await jobHandler(jobParams, this.context);
            if (result && result.error) {
                error = result.error;
                delete result.error;
                if (Object.keys(result).length === 0) {
                    result = undefined;
                }
            }
        }
        catch (err) {
            error = err;
        }
        await this.finishJob({
            jobId,
            logId,
            callback,
            timeoutTimer,
            use_time: new Date().getTime() - use_time,
            result,
            error
        });
    }
    async readJobLog(logDateTime, logId) {
        const logFilePath = this.getLogFilePath(logDateTime);
        const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' ';
        return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : [];
    }
    async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
        try {
            timeoutTimer && clearTimeout(timeoutTimer);
            await callback(error, { logId, use_time, result });
        }
        catch (err) {
            console.log(`finishJob error: ${err.message}`);
        }
        finally {
            this.runningJobs.delete(jobId);
        }
    }
}
exports.JobManager = JobManager;
//# sourceMappingURL=job_manager.js.map