job_manager.ts 4.0 KB
import * as moment from 'moment'
import * as Path from 'path'


// const Path = require('path')
// const logger = require('./logger')
// const log = logger('xxl-job-manager')
// const { Task, tapTask } = require('./purefuncs')
// const { mkdir, searchInFile } = require('./file')


/**
 * 任务管理
 */
export class JobManager {
  context
  runningJobs
  jobLogPath
  /**
   * @param {string} jobLogPath
   * @param {*} context
   */
  constructor(jobLogPath, context) {
    // mkdir(jobLogPath)
    this.jobLogPath = jobLogPath
    this.context = context
    this.runningJobs = new Set()
  }

  /**
   * 根据调度时间获取日志文件路径
   * @param {number} dateTime
   * @return {string}
   */
  getLogFilePath(dateTime) {
    return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`)
  }

  /**
   * 构造任务logger的namespace
   * @param {string} handlerName
   * @param {number} dateTime
   * @param {number} logId
   * @return {string}
   */
  getJobLoggerNamespace(handlerName, dateTime, logId) {
    return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`
  }

  /**
   * @param {{number}} jobId
   * @return {boolean}
   */
  hasJob(jobId) {
    return this.runningJobs.has(jobId)
  }

  async runJob(
    jobId,               // 任务ID
    jobJsonParams,       // 任务参数(JSON格式)
    logId,               // 日志ID
    logDateTime,         // 日志时间
    executorTimeout,     // 执行超时时间(秒)
    handlerName,         // 处理器名称
    jobHandler,          // 任务处理函数
    callback             // 回调函数
  ) {

    // 1. 初始化日志记录器
    // const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
    // const logFilePath = this.getLogFilePath(logDateTime);
    // const jobLogger = logger(loggerNamespace, logFilePath);


    let result, timeoutTimer, error
    let use_time = new Date().getTime()

    try {

      if (this.hasJob(jobId)) {
        throw new Error('已有相同任务正在运行')
      }
      this.runningJobs.add(jobId)

      let jobParams
      try {
        jobParams = JSON.parse(jobJsonParams);
      } catch {
        jobJsonParams = {}
      }

      if (executorTimeout) {
        timeoutTimer = setTimeout(
          () => this.finishJob({
            jobId,
            logId,
            callback,
            timeoutTimer,
            use_time: new Date().getTime() - use_time,
            result: null,
            error: new Error('任务执行超时')
          }),
          executorTimeout * 1000
        );
      }


      result = await jobHandler(jobParams, this.context);

      if (result && result.error) {
        error = result.error
        delete result.error
        if (Object.keys(result).length === 0) {
          result = undefined
        }
      }

    } catch (err) {
      error = err
    }
    await this.finishJob({
      jobId,
      logId,
      callback,
      timeoutTimer,
      use_time: new Date().getTime() - use_time,
      result,
      error
    });


  }

  /**
   * @param {number} logDateTime
   * @param {number} logId
   * @return {Promise<Array>}
   */
  async readJobLog(logDateTime, logId) {
    const logFilePath = this.getLogFilePath(logDateTime)
    const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' '
    return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : []
  }

  /**
   * @param {number} jobId
   * @param {number} logId
   * @param {*} jobLogger
   * @param {function} callback
   * @param {number} timeoutTimer
   * @param {number} use_time
   * @param {*} result
   * @param {*} error
   * @return {Promise<void>}
   */
  async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
    try {
      timeoutTimer && clearTimeout(timeoutTimer)

      await callback(error, { logId, use_time, result })

    } catch (err) {
      console.log(`finishJob error: ${err.message}`)
    } finally {
      this.runningJobs.delete(jobId)
    }
  }
}