executor.ts 11.3 KB

import { JobManager } from './job-manager'
import axios from 'axios'

export class Executor {
  private executorKey
  private scheduleCenterUrl
  private accessToken
  private executorUrl
  private jobHandlers
  private jobManager

  /**
   * @param {string} executorKey
   * @param {string} scheduleCenterUrl
   * @param {string} accessToken
   * @param {string} jobLogPath
   * @param {Map} jobHandlers
   */
  constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers) {
    this.executorKey = executorKey
    this.scheduleCenterUrl = scheduleCenterUrl
    this.accessToken = accessToken
    this.jobHandlers = jobHandlers
    this.jobManager = new JobManager(jobLogPath)
  }

  /**
   * 应用执行器中间件
   * @param {*} app koa | express
   * @param {string} domain
   */
  applyMiddleware({ appType, domain }) {
    this.executorUrl = domain

    if (appType === 'express') {
      return this.applyExpressMiddleware()
    } else if (appType === 'koa') {
      return this.applyKoaMiddleware()
    } else {
      throw new Error(`Unsupported appType: ${appType}`)
    }
  }

  /**
   * Apply middleware for Express
  */
  private applyExpressMiddleware() {
    const express = require('express')
    const app = express().use(require('body-parser').json())
    const router = express.Router()

    router.use(async (req, res, next) => {
      res.status(200)

      const token = req.headers && req.headers['xxl-job-access-token']

      if (!!this.accessToken && this.accessToken !== token) {
        res.send({ code: 500, msg: 'access token incorrect' })
        return
      }

      if (!req.body) {
        res.send({ code: 500, msg: 'body is null' })
        return
      }

      next()
    })

    this.addJobRoutes(router, 'express')
    app.use(router)

    return app
  }

  /**
   * Apply middleware for Koa
   * @param {*} app 
   */
  private applyKoaMiddleware() {
    const Koa = require('koa')
    const app = new Koa()
    app.use(require('koa-bodyparser')())

    const Router = require('koa-router')
    const router = new Router()
    const bodyParser = require('koa-bodyparser')

    // Request authentication
    router.use(async (ctx, next) => {
      ctx.status = 200

      const token = ctx.headers && ctx.headers['xxl-job-access-token']

      if (!!this.accessToken && this.accessToken !== token) {
        ctx.body = { code: 500, msg: 'access token incorrect' }
        return
      }

      if (!ctx.request.body) {
        ctx.body = { code: 500, msg: 'body is null' }
        return
      }

      await next()
    })

    this.addJobRoutes(router, 'koa')
    app.use(bodyParser())
    app.use(router.routes()).use(router.allowedMethods())

    return app
  }

  /**
   * 添加xxl-job相关的路由,供调度中心访问
   * @param {express.Router} router
   * @param {string} baseUri
   */
  addJobRoutes(router, appType) {
    // router.post(`/beat`, async (req, res, next) => {
    //   res.send(this.beat())
    // })

    // router.post(`/idleBeat`, async (req, res, next) => {
    //   res.send(this.idleBeat(req.body.jobId || -1))
    // })

    // router.post(`/run`, async (req, res, next) => {
    //   res.send(this.run(req.body || {}))
    // })

    // router.post(`/kill`, async (req, res, next) => {
    //   res.send(this.killJob(req.body.jobId || -1))
    // })

    // router.post(`/log`, async (req, res, next) => {
    //   const { logDateTim, logId, fromLineNum } = req.body || {}
    //   const data = await this.readLog(logDateTim, logId, fromLineNum)
    //   res.send(data)
    // })

    // 定义通用的路由处理方法
    const handleRoute = (path: string, handler: (reqBody: any) => any) => {
      if (appType === 'express') {
        router.post(path, async (req: any, res: any) => {
          const result = handler(req.body || {});
          res.send(result);
        });
      } else if (appType === 'koa') {
        router.post(path, async (ctx: any) => {
          const result = handler(ctx.request.body || {});
          ctx.body = result;
        });
      }
    };

    // 注册路由
    handleRoute('/beat', () => this.beat());
    handleRoute('/idleBeat', (body) => this.idleBeat(body.jobId || -1));
    handleRoute('/run', (body) => this.run(body || {}));
    handleRoute('/kill', (body) => this.killJob(body.jobId || -1));
    handleRoute('/log', async (body) => {
      const { logDateTim, logId, fromLineNum } = body || {};
      return await this.readLog(logDateTim, logId, fromLineNum);
    });
  }

  /**
   * 心跳检测:调度中心检测执行器是否在线时使用
   * @return {{code: number, msg: string}}
   */
  beat() {
    return { code: 200, msg: 'success' }
  }

  /**
   * 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
   * @param {string} jobId - 任务ID
   * @return {{code: number, msg: string}}
   */
  idleBeat(jobId) {
    return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' })
  }

  /**
   * 触发任务执行
   * @param {number} jobId - 任务ID
   * @param {string} handlerName - 任务的handler名字
   * @param {string} jobJsonParams - 任务参数
   * @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
   * @param {number} logId - 本次调度日志ID
   * @param {number} - 本次调度日志时间
   * @return {{code: number, msg: string}}
   * 
   * 请求数据格式如下,放置在 RequestBody 中,JSON格式:
    {
        "jobId":1,                                  // 任务ID
        "executorHandler":"demoJobHandler",         // 任务标识
        "executorParams":"demoJobHandler",          // 任务参数
        "executorBlockStrategy":"COVER_EARLY",      // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum
        "executorTimeout":0,                        // 任务超时时间,单位秒,大于零时生效
        "logId":1,                                  // 本次调度日志ID
        "logDateTime":1586629003729,                // 本次调度日志时间
        "glueType":"BEAN",                          // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
        "glueSource":"xxx",                         // GLUE脚本代码
        "glueUpdatetime":1586629003727,             // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
        "broadcastIndex":0,                         // 分片参数:当前分片
        "broadcastTotal":0                          // 分片参数:总分片
    }
   *
   */
  run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {

    const jobHandler = this.jobHandlers.get(handlerName)
    if (!jobHandler) {
      return { code: 500, msg: `no matched jobHandler(${handlerName})` }
    }

    this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this))

    return { code: 200, msg: 'success' }
  }

  /**
   * 终止任务
   * @param {number} jobId - 任务ID
   * @return {{code: number, msg: string}}
   */
  killJob(jobId) {
    return { code: 500, msg: `not yet support, jobId(${jobId})` }
  }

  /**
   * 查看执行日志
   * @param {number} logDateTime - 本次调度日志时间
   * @param {number} logId - 本次调度日志ID
   * @param {number} fromLineNum - 日志开始行号
   * @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
   */
  async readLog(logDateTime, logId, fromLineNum) {
    //待实现
    return {
      code: 200, content: {
        "fromLineNum": 0,        // 本次请求,日志开始行数
        "toLineNum": 100,        // 本次请求,日志结束行号
        "logContent": "test",     // 本次请求日志内容
        "isEnd": true            // 日志是否全部加载完
      }
    }

    //let logContent
    //let toLineNum
    // try {
    //   const lines = await this.jobManager.readJobLog(logDateTime, logId)
    //   lines.splice(0, fromLineNum - 1)
    //   if (last(lines) === '') lines.pop()
    //   toLineNum = fromLineNum + lines.length - 1
    //   lines.unshift('')
    //   logContent = lines.join('\n')
    // } catch (err) {
    //   log.err('readLog error: %o', err.message)
    //   toLineNum = fromLineNum
    //   logContent = err.toString()
    // }


    //       请求数据格式如下,放置在 RequestBody 中,JSON格式:
    //     {
    //         "logDateTim":0,     // 本次调度日志时间
    //         "logId":0,          // 本次调度日志ID
    //         "fromLineNum":0     // 日志开始行号,滚动加载日志
    //     }
    // 响应数据格式:
    //     {
    //         "code":200,         // 200 表示正常、其他失败
    //         "msg": null         // 错误提示消息
    //         "content":{
    //             "fromLineNum":0,        // 本次请求,日志开始行数
    //             "toLineNum":100,        // 本次请求,日志结束行号
    //             "logContent":"xxx",     // 本次请求日志内容
    //             "isEnd":true            // 日志是否全部加载完
    //         }
    //     }


    // return { code: 200, content: { fromLineNum, toLineNum, logContent } }
  }

  /**
   * 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
   */
  async registry() {
    return axios.post(`${this.scheduleCenterUrl}/api/registry`,
      {
        registryGroup: 'EXECUTOR',
        registryKey: this.executorKey,
        registryValue: this.executorUrl
      },
      {
        headers: {
          'XXL-JOB-ACCESS-TOKEN': this.accessToken,
          'Content-Type': 'application/json'
        }
      }).then((response) => {
        // console.log('执行器注册成功:', response.data);
      }).catch(error => console.error('执行器注册失败:', error.message))
  }

  /**
   * 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
   */
  async registryRemove() {
    return axios.post(`${this.scheduleCenterUrl}/api/registryRemove`,
      {
        'registryGroup': 'EXECUTOR',
        'registryKey': this.executorKey,
        'registryValue': this.executorUrl
      },
      {
        headers: {
          'XXL-JOB-ACCESS-TOKEN': this.accessToken,
          'Content-Type': 'application/json'
        }
      }).then((response) => console.log('执行器移除成功:', response.data)).catch(error => console.error('执行器移除失败:', error.message))
  }

  /**
 * 任务回调:执行器执行完任务后,回调任务结果时使用
 * @param {Error|null} error 错误对象
 * @param {{logId: number, result: any ,use_time:number}} jobResult 任务结果
 */
  async callback({ error, logId, result, use_time }) {
    var handleMsg: any = JSON.stringify({ use_time, result, error: error && error.message })

    return axios.post(
      `${this.scheduleCenterUrl}/api/callback`,
      [{
        logId,
        logDateTim: Date.now(),
        handleCode: error ? 500 : 200,
        handleMsg
      }],
      {
        headers: {
          'XXL-JOB-ACCESS-TOKEN': this.accessToken,
          'Content-Type': 'application/json'
        }
      }).then((response) => response.data).catch((err) => {
        console.error(`callback error:${JSON.stringify(result)} ${err.message}`);
      })
  }
}