job-manager.ts 4.7 KB
/**
 * 任务管理系统 - 用于管理并发任务,支持日志记录和超时控制
 */
export class JobManager {
  private runningJobs;    // 存储正在运行的任务ID集合
  private jobLogPath;     // 任务日志存储路径

  /**
   * 构造函数
   * @param {string} jobLogPath - 任务日志存储路径
   */
  constructor(jobLogPath) {
    this.jobLogPath = jobLogPath;
    this.runningJobs = new Set();  // 使用Set存储运行中任务ID,保证唯一性
  }

  /**
   * 根据执行时间获取日志文件路径
   * @param {number} dateTime - 任务执行时间戳
   * @return {string} - 日志文件完整路径
   */
  getLogFilePath(dateTime) {
    // 待实现:根据时间戳生成日志文件路径
    // 示例:按日期分目录存储
  }

  /**
   * 生成任务日志器的命名空间
   * @param {string} handlerName - 任务处理器名称
   * @param {number} dateTime - 任务执行时间戳
   * @param {number} logId - 日志唯一ID
   * @return {string} - 日志器命名空间字符串
   */
  getJobLoggerNamespace(handlerName, dateTime, logId) {
    // 待实现:生成格式化的日志命名空间
    // 示例:`job:${handlerName}:${dateTime}:${logId}`
  }

  /**
   * 检查任务是否正在运行
   * @param {number} jobId - 任务ID
   * @return {boolean} - 是否存在该运行中任务
   */
  hasJob(jobId) {
    return this.runningJobs.has(jobId);
  }

  /**
   * 执行任务
   * @param {number} jobId - 任务ID
   * @param {string} jobJsonParams - 任务参数(JSON字符串格式)
   * @param {number} logId - 日志ID
   * @param {number} logDateTime - 日志时间戳
   * @param {number} executorTimeout - 执行超时时间(秒)
   * @param {string} handlerName - 处理器名称
   * @param {function} jobHandler - 任务处理函数
   * @param {function} callback - 任务完成回调函数
   */
  async runJob(
    jobId,
    jobJsonParams,
    logId,
    logDateTime,
    executorTimeout,
    handlerName,
    jobHandler,
    callback
  ) {
    // 1. 初始化日志记录器(待实现)
    // const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
    // const logFilePath = this.getLogFilePath(logDateTime);
    // const jobLogger = logger(loggerNamespace, logFilePath);

    let result, timeoutTimer, error;
    const startTime = Date.now();

    try {
      // 检查是否有重复任务
      if (this.hasJob(jobId)) {
        throw new Error('已有相同任务正在运行');
      }
      this.runningJobs.add(jobId);

      // 解析任务参数
      let jobParams = {};
      try {
        jobParams = JSON.parse(jobJsonParams) || {};
      } catch { }

      // 设置超时定时器
      if (executorTimeout) {
        timeoutTimer = setTimeout(
          () => this.finishJob({
            jobId,
            logId,
            callback,
            timeoutTimer: null,
            use_time: Date.now() - startTime,
            result: null,
            error: new Error('任务执行超时')
          }),
          executorTimeout * 1000
        );
      }

      // 执行任务处理函数
      result = await jobHandler(jobParams);

      // 处理可能存在的错误结果
      if (result && result.error) {
        error = result.error;
        delete result.error;
        if (Object.keys(result).length === 0) {
          result = undefined;
        }
      }

    } catch (err) {
      error = err;
    } finally {
      await this.finishJob({
        jobId,
        logId,
        callback,
        timeoutTimer,
        use_time: Date.now() - startTime,
        result,
        error
      });
    }
  }

  /**
   * 读取任务日志
   * @param {number} logDateTime - 日志时间戳
   * @param {number} logId - 日志ID
   * @return {Promise<Array>} - 日志内容数组
   */
  async readJobLog(logDateTime, logId) {
    // 待实现:读取指定日志文件内容
  }

  /**
   * 完成任务处理
   * @param {Object} params - 任务完成参数对象
   * @param {number} params.jobId - 任务ID
   * @param {number} params.logId - 日志ID
   * @param {function} params.callback - 回调函数
   * @param {number} params.timeoutTimer - 超时定时器
   * @param {number} params.use_time - 任务耗时(毫秒)
   * @param {*} params.result - 任务结果
   * @param {Error} params.error - 任务错误
   */
  async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
    try {
      // 清除超时定时器
      timeoutTimer && clearTimeout(timeoutTimer);

      // 执行回调
      await callback({ error, logId, use_time, result });

    } catch (err) {
      console.error(`完成任务时发生错误: ${err.message}`);
    } finally {
      // 无论成功与否,都从运行集合中移除
      this.runningJobs.delete(jobId);
    }
  }
}