Commit 82903c43 Harvey

no message

1 个父辈 dc3e267c
...@@ -16,8 +16,8 @@ global.app_config.job = Object.freeze( ...@@ -16,8 +16,8 @@ global.app_config.job = Object.freeze(
}, },
port: 8088, port: 8088,
app_domain: "http://10.0.1.92:17001/", //每个客户端端口不要一样 目前支持17001-17010 ,对应的代理端口也要改成一样的(frp目下新建配置文件) app_domain: "http://10.0.1.92:17001/", //每个客户端端口不要一样 目前支持17001-17010 ,对应的代理端口也要改成一样的(frp目下新建配置文件)
enable_frpc: true, enable_frpc: true, //内网穿透 服务器代理
enable_piscina: true, enable_piscina: true, //任务交给多线程
}; };
case "localhost1": case "localhost1":
return { return {
......
import { JobManager } from './job_manager'
// const logger = require('./logger')
// const log = logger('xxl-job-executor')
import axios from 'axios';
export class Executor {
private executorKey
private scheduleCenterUrl
private accessToken
private executorUrl
private jobHandlers
private jobManager
/**
* @param {string} executorKey
* @param {string} scheduleCenterUrl
* @param {string} accessToken
* @param {string} jobLogPath
* @param {Map} jobHandlers
* @param {*} context
*/
constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context) {
this.executorKey = executorKey
this.scheduleCenterUrl = scheduleCenterUrl
this.accessToken = accessToken
this.jobHandlers = jobHandlers
this.jobManager = new JobManager(jobLogPath, context)
}
/**
* 应用执行器中间件
* @param {*} app
* @param {string} domain
*/
applyMiddleware({ app, domain }) {
this.executorUrl = domain
const express = require('express')
var router = new express.Router()
//请求认证
router.use(async (req, res, next) => {
res.status(200)
const token = req.headers && req.headers['xxl-job-access-token']
if (!!this.accessToken && this.accessToken !== token) {
res.send({ code: 500, msg: 'access token incorrect' })
return
}
if (!req.body) {
res.send({ code: 500, msg: 'body is null' })
return
}
next()
})
this.addJobRoutes(router)
app.use(router)
}
/**
* 添加xxl-job相关的路由,供调度中心访问
* @param {express.Router} router
* @param {string} baseUri
*/
addJobRoutes(router) {
router.post(`/beat`, async (req, res, next) => {
res.send(this.beat())
})
router.post(`/idleBeat`, async (req, res, next) => {
res.send(this.idleBeat(req.body.jobId || -1))
})
router.post(`/run`, async (req, res, next) => {
res.send(this.run(req.body || {}))
})
router.post(`/kill`, async (req, res, next) => {
res.send(this.killJob(req.body.jobId || -1))
})
router.post(`/log`, async (req, res, next) => {
const { logDateTim, logId, fromLineNum } = req.body || {}
const data = await this.readLog(logDateTim, logId, fromLineNum)
res.send(data)
})
}
/**
* 心跳检测:调度中心检测执行器是否在线时使用
* @return {{code: number, msg: string}}
*/
beat() {
return { code: 200, msg: 'success' }
}
/**
* 忙碌检测:调度中心检测指定执行器上指定任务是否忙碌(运行中)时使用
* @param {string} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
idleBeat(jobId) {
return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' })
}
/**
* 触发任务执行
* @param {number} jobId - 任务ID
* @param {string} handlerName - 任务的handler名字
* @param {string} jobJsonParams - 任务参数
* @param {number} executorTimeout - 任务超时时间,单位秒,大于零时生效
* @param {number} logId - 本次调度日志ID
* @param {number} - 本次调度日志时间
* @return {{code: number, msg: string}}
*/
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
const jobHandler = this.jobHandlers.get(handlerName)
if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` }
}
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this))
return { code: 200, msg: 'success' }
}
/**
* 终止任务
* @param {number} jobId - 任务ID
* @return {{code: number, msg: string}}
*/
killJob(jobId) {
return { code: 500, msg: `not yet support, jobId(${jobId})` }
}
/**
* 查看执行日志
* @param {number} logDateTime - 本次调度日志时间
* @param {number} logId - 本次调度日志ID
* @param {number} fromLineNum - 日志开始行号
* @return {*} - fromLineNum:日志开始行号; toLineNum:日志结束行号; logContent:日志内容
*/
async readLog(logDateTime, logId, fromLineNum) {
//待实现
return {
code: 200, content: {
"fromLineNum": 0, // 本次请求,日志开始行数
"toLineNum": 100, // 本次请求,日志结束行号
"logContent": "test", // 本次请求日志内容
"isEnd": true // 日志是否全部加载完
}
}
//let logContent
//let toLineNum
// try {
// const lines = await this.jobManager.readJobLog(logDateTime, logId)
// lines.splice(0, fromLineNum - 1)
// if (last(lines) === '') lines.pop()
// toLineNum = fromLineNum + lines.length - 1
// lines.unshift('')
// logContent = lines.join('\n')
// } catch (err) {
// log.err('readLog error: %o', err.message)
// toLineNum = fromLineNum
// logContent = err.toString()
// }
// 请求数据格式如下,放置在 RequestBody 中,JSON格式:
// {
// "logDateTim":0, // 本次调度日志时间
// "logId":0, // 本次调度日志ID
// "fromLineNum":0 // 日志开始行号,滚动加载日志
// }
// 响应数据格式:
// {
// "code":200, // 200 表示正常、其他失败
// "msg": null // 错误提示消息
// "content":{
// "fromLineNum":0, // 本次请求,日志开始行数
// "toLineNum":100, // 本次请求,日志结束行号
// "logContent":"xxx", // 本次请求日志内容
// "isEnd":true // 日志是否全部加载完
// }
// }
// return { code: 200, content: { fromLineNum, toLineNum, logContent } }
}
/**
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/
async registry() {
return axios.post(`${this.scheduleCenterUrl}/api/registry`,
{
registryGroup: 'EXECUTOR',
registryKey: this.executorKey,
registryValue: this.executorUrl
},
{
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => {
// console.log('执行器注册成功:', response.data);
}).catch(error => console.error('执行器注册失败:', error.message))
}
/**
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/
async registryRemove() {
return axios.post(`${this.scheduleCenterUrl}/api/registryRemove`,
{
'registryGroup': 'EXECUTOR',
'registryKey': this.executorKey,
'registryValue': this.executorUrl
},
{
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => console.log('执行器移除成功:', response.data)).catch(error => console.error('执行器移除失败:', error.message))
}
/**
* 任务回调:执行器执行完任务后,回调任务结果时使用
* @param {Error|null} error 错误对象
* @param {{logId: number, result: any ,use_time:number}} jobResult 任务结果
*/
async callback(error, { logId, result, use_time }) {
var handleMsg: any = JSON.stringify({ use_time, result, error: error && error.message })
return axios.post(
`${this.scheduleCenterUrl}/api/callback`,
[{
logId,
logDateTim: Date.now(),
handleCode: error ? 500 : 200,
handleMsg
}],
{
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => response.data).catch((err) => {
console.error(`callback error:${JSON.stringify(result)} ${err.message}`);
})
}
}
const { once } = require('events')
const fs = require('fs')
const readline = require('readline')
const mkdir = (path) => !fs.existsSync(path) && fs.mkdirSync(path, { recursive: true })
const searchInFile = async (filePath, content, endContent) => {
const rl = readline.createInterface({ input: fs.createReadStream(filePath), crlfDelay: Infinity })
const reg = content ? new RegExp(content, 'i') : undefined
const endReg = endContent ? new RegExp(endContent, 'i') : undefined
const lines = []
let stop = false
rl.on('line', (line) => {
if (stop) {
rl.close()
return
}
reg && reg.test(line) && lines.push(line)
endReg && endReg.test(line) && (stop = true) && rl.close()
})
await once(rl, 'close')
return lines
}
module.exports = {
mkdir,
searchInFile,
}
此文件太大,无法显示。
serverAddr = "118.89.26.241"
serverPort = 17000
auth.token="aolei@harvey"
[[proxies]]
name = "test-tcp"
type = "tcp"
localIP = "127.0.0.1"
localPort = 8088
remotePort = 17001
\ No newline at end of file \ No newline at end of file
serverAddr = "118.89.26.241"
serverPort = 17000
auth.token="aolei@harvey"
[[proxies]]
name = "test-tcp"
type = "tcp"
localIP = "127.0.0.1"
localPort = 8088
remotePort = 17002
\ No newline at end of file \ No newline at end of file
const { spawn } = require('child_process');
console.log('启动 frpc ...');
// 1. 启动 frpc 进程
const frpc = spawn(`${__dirname}/frpc`, ['-c', `${__dirname}/frpc_${process.env.NAME}.toml`], {
stdio: 'inherit' // 共享输入输出
});
// 2. 监听 Node.js 退出事件并关闭 frpc
function cleanup() {
if (!frpc.killed) {
frpc.kill('SIGTERM'); // 发送终止信号
}
}
// 捕获各种退出信号
process.on('exit', cleanup);
process.on('SIGINT', () => {
cleanup();
process.exit();
});
process.on('SIGTERM', () => {
cleanup();
process.exit();
});
// 3. 监听 frpc 的退出
frpc.on('exit', (code) => {
console.log(`frpc 已退出,代码: ${code}`);
});
\ No newline at end of file \ No newline at end of file
import { Executor } from './executor';
/**
* XxlJobExecutor
*/
export class XxlJobExecutor {
private executor
private registryInterval
/**
* 创建 XxlJobExecutor 实例
* @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数
* @param {Object} context 为所有任务执行函数指定公共的上下文对象,常见比如数据库实例 { database, redis }
*/
constructor(opts, jobHandlers, context = undefined) {
const {
XXL_JOB_EXECUTOR_KEY: executorKey,
XXL_JOB_SCHEDULE_CENTER_URL: scheduleCenterUrl,
XXL_JOB_ACCESS_TOKEN: accessToken,
XXL_JOB_JOB_LOG_PATH: jobLogPath,
} = opts;
this.executor = new Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context)
}
/**
* 应用执行器组件
* @param {Object} args
* @param {any} args.app 执行器server, express
* @param {string} args.appDomain 执行器 server 地址
*/
public applyMiddleware({ app, domain }) {
this.executor.applyMiddleware({ app, domain })
const registry = this.executor.registry.bind(this.executor)
registry() && (this.registryInterval = setInterval(registry, 30000))
}
/**
* 关闭服务前应调用该方法,将执行器从调度中心摘除
*/
async close() {
if (this.registryInterval) {
clearInterval(this.registryInterval)
}
return this.executor.registryRemove()
}
}
\ No newline at end of file \ No newline at end of file
import * as moment from 'moment'
import * as Path from 'path'
// const Path = require('path')
// const logger = require('./logger')
// const log = logger('xxl-job-manager')
// const { Task, tapTask } = require('./purefuncs')
// const { mkdir, searchInFile } = require('./file')
/**
* 任务管理
*/
export class JobManager {
context
runningJobs
jobLogPath
/**
* @param {string} jobLogPath
* @param {*} context
*/
constructor(jobLogPath, context) {
// mkdir(jobLogPath)
this.jobLogPath = jobLogPath
this.context = context
this.runningJobs = new Set()
}
/**
* 根据调度时间获取日志文件路径
* @param {number} dateTime
* @return {string}
*/
getLogFilePath(dateTime) {
return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`)
}
/**
* 构造任务logger的namespace
* @param {string} handlerName
* @param {number} dateTime
* @param {number} logId
* @return {string}
*/
getJobLoggerNamespace(handlerName, dateTime, logId) {
return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`
}
/**
* @param {{number}} jobId
* @return {boolean}
*/
hasJob(jobId) {
return this.runningJobs.has(jobId)
}
async runJob(
jobId, // 任务ID
jobJsonParams, // 任务参数(JSON格式)
logId, // 日志ID
logDateTime, // 日志时间
executorTimeout, // 执行超时时间(秒)
handlerName, // 处理器名称
jobHandler, // 任务处理函数
callback // 回调函数
) {
// 1. 初始化日志记录器
// const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
// const logFilePath = this.getLogFilePath(logDateTime);
// const jobLogger = logger(loggerNamespace, logFilePath);
let result, timeoutTimer, error
let use_time = new Date().getTime()
try {
if (this.hasJob(jobId)) {
throw new Error('已有相同任务正在运行')
}
this.runningJobs.add(jobId)
let jobParams
try {
jobParams = JSON.parse(jobJsonParams);
} catch {
jobJsonParams = {}
}
if (executorTimeout) {
timeoutTimer = setTimeout(
() => this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: new Date().getTime() - use_time,
result: null,
error: new Error('任务执行超时')
}),
executorTimeout * 1000
);
}
result = await jobHandler(jobParams, this.context);
if (result && result.error) {
error = result.error
delete result.error
if (Object.keys(result).length === 0) {
result = undefined
}
}
} catch (err) {
error = err
}
await this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: new Date().getTime() - use_time,
result,
error
});
}
/**
* @param {number} logDateTime
* @param {number} logId
* @return {Promise<Array>}
*/
async readJobLog(logDateTime, logId) {
const logFilePath = this.getLogFilePath(logDateTime)
const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' '
return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : []
}
/**
* @param {number} jobId
* @param {number} logId
* @param {*} jobLogger
* @param {function} callback
* @param {number} timeoutTimer
* @param {number} use_time
* @param {*} result
* @param {*} error
* @return {Promise<void>}
*/
async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
try {
timeoutTimer && clearTimeout(timeoutTimer)
await callback(error, { logId, use_time, result })
} catch (err) {
console.log(`finishJob error: ${err.message}`)
} finally {
this.runningJobs.delete(jobId)
}
}
}
\ No newline at end of file \ No newline at end of file
// import debug from 'debug';
// import fs from 'fs';
// import os from 'os';
// import util from 'util';
// // 类型定义
// type LogLevel = 'error' | 'info' | 'warn' | 'debug' | 'trace';
// type LoggerMethods = {
// [key in LogLevel]: debug.Debugger;
// } & {
// close: () => void;
// };
// interface LoggerOptions {
// namespace: string;
// logFilePath?: string;
// enabledLevels?: string;
// debugEnabled?: boolean;
// }
// // 配置解析
// const parseEnvBool = (envVar: string): boolean => {
// const value = process.env[envVar] || '';
// return /^(yes|on|true|enable|enabled|1)$/i.test(value);
// };
// const getEnvValue = (envVar: string, defaultValue: string): string => {
// return process.env[envVar] !== undefined ? process.env[envVar] : defaultValue;
// };
// // 默认配置
// const DEFAULT_LOG_LEVELS = 'info:*,warn:*,error:*,debug:*,trace:*';
// const WRITE_STREAM_OPTIONS: fs.WriteStreamOptions = {
// flags: 'a',
// encoding: 'utf8',
// autoClose: true,
// emitClose: true
// };
// // 空日志函数
// const noop = (..._args: any[]): void => {};
// const noopLogger: LoggerMethods = {
// error: noop,
// info: noop,
// warn: noop,
// debug: noop,
// trace: noop,
// close: noop
// };
// // 创建基础日志函数
// const createBaseLoggers = (namespace: string): Omit<LoggerMethods, 'close'> => {
// return {
// error: debug('error').extend(namespace),
// info: debug('info').extend(namespace),
// warn: debug('warn').extend(namespace),
// debug: debug('debug').extend(namespace),
// trace: debug('trace').extend(namespace)
// };
// };
// // 配置日志级别
// const configureLogLevels = (
// logger: Omit<LoggerMethods, 'close'>,
// enabledLevels: string
// ): void => {
// Object.entries(logger).forEach(([level]) => {
// logger[level as LogLevel].enabled = enabledLevels.includes(level);
// });
// };
// // 创建文件日志写入器
// const createFileLogger = (
// logFilePath: string,
// logger: Omit<LoggerMethods, 'close'>,
// enabledLevels: string
// ): LoggerMethods => {
// const writeStream = fs.createWriteStream(logFilePath, WRITE_STREAM_OPTIONS);
// const logToFile = (...args: any[]): void => {
// writeStream.write(`${util.format(...args)}${os.EOL}`);
// };
// const logToConsoleAndFile = (...args: any[]): void => {
// const message = util.format(...args);
// console.error(message);
// writeStream.write(`${message}${os.EOL}`);
// };
// // 配置日志输出方式
// Object.entries(logger).forEach(([level, logFn]) => {
// logFn.log = enabledLevels.includes(level) ? logToConsoleAndFile : logToFile;
// });
// // 返回完整logger对象
// return {
// ...logger,
// close: (): void => {
// writeStream.end();
// // 切换回仅控制台日志
// Object.values(logger).forEach((logFn) => {
// logFn.log = console.error;
// });
// }
// };
// };
// // 主导出函数
// export const createLogger = (
// options: LoggerOptions | string
// ): LoggerMethods => {
// // 处理参数重载
// const normalizedOptions = typeof options === 'string'
// ? { namespace: options }
// : options;
// const {
// namespace,
// logFilePath,
// enabledLevels = getEnvValue('DEBUG', DEFAULT_LOG_LEVELS),
// debugEnabled = parseEnvBool('XXL_JOB_DEBUG_LOG')
// } = normalizedOptions;
// if (!debugEnabled && !logFilePath) {
// return noopLogger;
// }
// const logger = createBaseLoggers(namespace);
// // 配置日志级别
// configureLogLevels(logger, enabledLevels);
// if (!logFilePath) {
// return {
// ...logger,
// close: noop
// };
// }
// // 文件日志配置
// return createFileLogger(logFilePath, logger, enabledLevels);
// };
// // 默认导出
// export default createLogger;
\ No newline at end of file \ No newline at end of file
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!