job_manager.ts 3.7 KB
import * as moment from 'moment'
import * as Path from 'path'


// const Path = require('path')
// const logger = require('./logger')
// const log = logger('xxl-job-manager')
// const { Task, tapTask } = require('./purefuncs')
// const { mkdir, searchInFile } = require('./file')


/**
 * 任务管理
 */
export class JobManager {
  context
  runningJobs
  jobLogPath
  /**
   * @param {string} jobLogPath
   * @param {*} context
   */
  constructor(jobLogPath, context) {
    // mkdir(jobLogPath)
    this.jobLogPath = jobLogPath
    this.context = context
    this.runningJobs = new Set()
  }

  /**
   * 根据调度时间获取日志文件路径
   * @param {number} dateTime
   * @return {string}
   */
  getLogFilePath(dateTime) {
    return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`)
  }

  /**
   * 构造任务logger的namespace
   * @param {string} handlerName
   * @param {number} dateTime
   * @param {number} logId
   * @return {string}
   */
  getJobLoggerNamespace(handlerName, dateTime, logId) {
    return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`
  }

  /**
   * @param {{number}} jobId
   * @return {boolean}
   */
  hasJob(jobId) {
    return this.runningJobs.has(jobId)
  }

  async runJob(
    jobId,               // 任务ID
    jobJsonParams,       // 任务参数(JSON格式)
    logId,               // 日志ID
    logDateTime,         // 日志时间
    executorTimeout,     // 执行超时时间(秒)
    handlerName,         // 处理器名称
    jobHandler,          // 任务处理函数
    callback             // 回调函数
  ) {

    // 1. 初始化日志记录器
    // const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
    // const logFilePath = this.getLogFilePath(logDateTime);
    // const jobLogger = logger(loggerNamespace, logFilePath);


    let result, timeoutTimer, error

    try {

      if (this.hasJob(jobId)) {
        throw new Error('已有相同任务正在运行')
      }
      this.runningJobs.add(jobId)

      let jobParams
      try {
        jobParams = JSON.parse(jobJsonParams);
      } catch {
        jobJsonParams = {}
      }

      if (executorTimeout) {
        timeoutTimer = setTimeout(
          () => this.finishJob({
            jobId,
            logId,
            callback,
            timeoutTimer,
            result: null,
            error: new Error('任务执行超时')
          }),
          executorTimeout * 1000
        );
      }

      result = await jobHandler(jobParams, this.context);

    } catch (error) {
      error = error
    }

    await this.finishJob({
      jobId,
      logId,
      callback,
      timeoutTimer,
      result,
      error
    });


  }

  /**
   * @param {number} logDateTime
   * @param {number} logId
   * @return {Promise<Array>}
   */
  async readJobLog(logDateTime, logId) {
    const logFilePath = this.getLogFilePath(logDateTime)
    const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' '
    return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : []
  }

  /**
   * @param {number} jobId
   * @param {number} logId
   * @param {*} jobLogger
   * @param {function} callback
   * @param {number} timeout
   * @param {*} result
   * @param {*} error
   * @return {Promise<void>}
   */
  async finishJob({ jobId, logId, callback, timeoutTimer, result, error }) {
    try {
      timeoutTimer && clearTimeout(timeoutTimer)

      await callback(error, { logId, result })

    } catch (err) {
      console.log(`finishJob error: ${err.message}`)
    } finally {
      this.runningJobs.delete(jobId)
    }
  }
}