Commit ae5e8339 Harvey

no message

1 个父辈 c70b8080
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.init = init;
exports.add = add;
exports.act = act;
const _ = require("lodash");
const path = require("path");
const rp = require("request-promise");
const fastify = require("fastify");
const common_1 = require("../common");
const log_1 = require("../log");
const service_1 = require("../service");
let logger;
const api = {};
const fast = fastify({ trustProxy: true });
let targetPort = 0;
function init(cfg, callback) {
logger = new log_1.Log(`msbase_fastify`);
if (cfg.targetPort)
targetPort = cfg.targetPort;
const start = () => {
if (!cfg.port)
return;
try {
_.forEach(cfg.apipath ? cfg.apipath : ['./build/services/'], common_1.requiredir);
}
catch (error) {
logger.log(error.message);
}
logger.log('api initialized');
let circuitBreakerOpt = { threshold: 3, timeout: 3000, resetTimeout: 3000 };
if (cfg.circuitBreaker)
circuitBreakerOpt = { ...circuitBreakerOpt, ...cfg.circuitBreaker };
fast.register(require('fastify-circuit-breaker'), circuitBreakerOpt);
fast.register(require('fastify-compress'), { global: false });
try {
const route = cfg.routefile ? require(path.resolve(cfg.routefile)) : null;
if (route)
route(fast);
}
catch (error) {
logger.log(error.message);
}
_.keys(api).length > 0 && fast.register(function (instance, opts, next) {
instance.route({
method: 'POST',
url: '/:cmd/',
handler: async (request, reply) => {
let payload = {};
const start = Date.now();
const name = request.params.cmd.toLowerCase();
const bodyJson = JSON.stringify(request.body);
logger.debug(`1-api-${name} args-${bodyJson}`, { method_name: name });
try {
const action = api[name];
if (!action)
throw new Error(`${name} does not exist in ${cfg.name}`);
payload.result = await action(request.body);
}
catch (error) {
payload.error = error.message;
}
const hs = Date.now() - start;
let spend_time = Math.floor(hs / 50);
if (spend_time > 99)
spend_time = 99;
logger.debug(`2-api-${name} args-${bodyJson} result-${JSON.stringify(payload)} 耗时-${hs}`, { method_name: name, spend_time: spend_time });
reply.compress(payload);
}
});
console.log('start ms api route');
next();
});
fast.listen(cfg.port, '0.0.0.0', (err, address) => {
if (err)
throw err;
logger.info(`server listening on ${address}`);
});
};
callback ? callback(start) : start();
}
function add(args, callback) {
api[args.cmd] = callback;
console.log(`api '${args.cmd}' success`);
}
async function act(args, isThrowError = false) {
if (!args.topic || !args.cmd)
throw new Error('topic or cmd is null');
if (args.topic)
args.topic = args.topic.toLowerCase();
if (args.cmd)
args.cmd = args.cmd.toLowerCase();
const args_obj = _.pick(args, ['data', 'context']);
const body = JSON.stringify(args_obj);
if (args.topic === service_1.config.name)
return api[args.cmd]({ ...args_obj });
const url = `http://service-${args.topic}${targetPort ? (':' + targetPort) : ''}/${args.cmd}/`;
const start = Date.now();
let ret = await rp.post(url, { body, gzip: true, forever: true, headers: { 'Content-Type': 'application/json' } })
.then(body => JSON.parse(body))
.catch(e => { return { error: e.message }; });
const hs = Date.now() - start;
let spend_time = Math.floor(hs / 50);
if (spend_time > 99)
spend_time = 99;
const msg = `act--${url}|${body}|${JSON.stringify(ret)}|${hs}`;
const opts = { method_name: args.cmd, spend_time: spend_time };
logger.debug(msg, opts);
if (ret) {
if ('error' in ret) {
logger.error(msg, opts);
if (isThrowError)
throw new Error(ret.error);
}
if ('result' in ret) {
ret = ret.result;
}
}
return ret;
}
//# sourceMappingURL=fast.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.init = init;
exports.act = act;
const nats_1 = require("nats");
const Hemera = require('nats-hemera');
let hemera;
let logger;
function init(cfg) {
logger = console;
hemera = new Hemera((0, nats_1.connect)(cfg.nats), {
...cfg.hemera
});
logger.info('msbase hemera log init success');
try {
const HemeraZipkin = require('hemera-zipkin');
hemera.use(HemeraZipkin, {
host: process.env.ZIPKIN_URL,
port: process.env.ZIPKIN_PORT,
sampling: 1
});
}
catch (e) {
logger.info('zipkin未配置: ');
}
hemera.ready(async () => {
logger.log('hemera initialized');
});
return hemera;
}
async function act(args, isThrowError = false) {
const starttime = Date.now();
let localfn = hemera.router.lookup(args);
if (localfn)
localfn = localfn.action;
if (!args.context)
args.context = {};
let ret = await (localfn ?
localfn(args)
: hemera.act(args)).catch((error) => ({ error: error.message, code: 1001 }));
if (ret && ret.context)
ret = ret.data;
if (ret && ret.error)
logger.error(ret.error);
if (isThrowError && ret && ret.error)
throw new Error(ret.error);
const total = Date.now() - starttime;
if (total > 200) {
logger.log(`${JSON.stringify(args)} 调用时间耗时:${total}毫秒 `);
}
return ret;
}
//# sourceMappingURL=hemera.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.init = init;
exports.add = add;
exports.act = act;
var adapter = require('./hemera');
var config = {};
function init(cfg, callback) {
switch (cfg.type) {
case 'fast':
adapter = require('./fast');
break;
default:
adapter = require('./hemera');
break;
}
config = cfg;
adapter.init(cfg, callback);
}
function add(args, callback) {
if (adapter) {
adapter.add(ckArgs(args), callback);
}
}
async function act(args, isThrowError = false) {
return adapter.act(ckArgs(args), isThrowError);
}
function ckArgs(args) {
if (!args)
return;
if (!args.topic)
args.topic = config.name;
if (args.topic)
args.topic = args.topic.toLowerCase();
if (args.cmd)
args.cmd = args.cmd.toLowerCase();
return args;
}
//# sourceMappingURL=index.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const path = require("path");
const config = {
appType: 'express',
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq",
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml')
};
var jobExecutor = new index_1.JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
});
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
]
];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=express-test.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const path = require("path");
const config = {
appType: 'express',
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq",
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml')
};
var jobExecutor = new index_1.JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
});
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
]
];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=express.js.map
\ No newline at end of file \ No newline at end of file
"use strict"; "use strict";
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
exports.JobExecutor = void 0; const job_executor_1 = require("job-executor");
class JobExecutor { const config = require('../config');
jobHandlers = new Map(); var jobExecutor = new job_executor_1.JobExecutor(config);
constructor(opts) { jobExecutor.register("demoJobHandler", async (args) => {
if (opts.frpcConfigPath) { for (let i = 1; i < 10; i++) {
require("./xxl-job/frpc").createFrpc(opts.frpcConfigPath); await new Promise((resolve, reject) => {
} setTimeout(resolve, 888);
const app = new (require("./xxl-job/index").XxlJobExecutor)(opts.xxlJob, this.jobHandlers).applyMiddleware({
appType: opts.appType,
domain: opts.appDomain
});
app.listen(opts.port, () => {
console.log(`[XXL-JOB] Executor ${opts.appType} "${opts.xxlJob.executorKey}" is running`);
console.log(`- Local: http://localhost:${opts.port}`);
console.log(`- Public: ${opts.appDomain}`);
}); });
} }
register(jobName, handler) { return { result: 'return value' };
if (this.jobHandlers.has(jobName)) { });
throw new Error(`Job handler "${jobName}" is already registered`); const batchJob = [
} [
if (typeof handler !== 'function') { "demoJobHandler1",
throw new Error(`Job handler "${jobName}" must be a function`); async (args) => {
} for (let i = 1; i < 10; i++) {
console.log(`[XXL-JOB] Job "${jobName}" registered`); await new Promise((resolve, reject) => {
this.jobHandlers.set(jobName, async (...args) => { setTimeout(resolve, 888);
try { });
return await handler(...args);
} }
catch (error) { return { result: 'return value' };
console.error(`[XXL-JOB] Job "${jobName}" failed:`, error);
return { error };
} }
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
}); });
} }
registerBatch(handlers) { return { result: 'return value' };
handlers.forEach((handler) => this.register(handler[0], handler[1]));
}
getJobHandler(jobName) {
return this.jobHandlers.get(jobName);
} }
} ]
exports.JobExecutor = JobExecutor; ];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=index.js.map //# sourceMappingURL=index.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const path = require("path");
const config = {
appType: "koa",
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq",
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml')
};
var jobExecutor = new index_1.JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
});
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
]
];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=koa.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("./index");
const path = require("path");
const config = {
appType: "express",
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq",
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml')
};
var jobExecutor = new index_1.JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
});
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
});
}
return { result: 'return value' };
}
]
];
jobExecutor.registerBatch(batchJob);
//# sourceMappingURL=test.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Executor = void 0;
const job_manager_1 = require("./job-manager");
const axios_1 = require("axios");
class Executor {
executorKey;
scheduleCenterUrl;
accessToken;
executorUrl;
jobHandlers;
jobManager;
constructor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers) {
this.executorKey = executorKey;
this.scheduleCenterUrl = scheduleCenterUrl;
this.accessToken = accessToken;
this.jobHandlers = jobHandlers;
this.jobManager = new job_manager_1.JobManager(jobLogPath);
}
applyMiddleware({ appType, domain }) {
this.executorUrl = domain;
if (appType === 'express') {
return this.applyExpressMiddleware();
}
else if (appType === 'koa') {
return this.applyKoaMiddleware();
}
else {
throw new Error(`Unsupported appType: ${appType}`);
}
}
applyExpressMiddleware() {
const express = require('express');
const app = express().use(require('body-parser').json());
const router = express.Router();
router.use(async (req, res, next) => {
res.status(200);
const token = req.headers && req.headers['xxl-job-access-token'];
if (!!this.accessToken && this.accessToken !== token) {
res.send({ code: 500, msg: 'access token incorrect' });
return;
}
if (!req.body) {
res.send({ code: 500, msg: 'body is null' });
return;
}
next();
});
this.addJobRoutes(router, 'express');
app.use(router);
return app;
}
applyKoaMiddleware() {
const Koa = require('koa');
const app = new Koa();
app.use(require('koa-bodyparser')());
const Router = require('koa-router');
const router = new Router();
const bodyParser = require('koa-bodyparser');
router.use(async (ctx, next) => {
ctx.status = 200;
const token = ctx.headers && ctx.headers['xxl-job-access-token'];
if (!!this.accessToken && this.accessToken !== token) {
ctx.body = { code: 500, msg: 'access token incorrect' };
return;
}
if (!ctx.request.body) {
ctx.body = { code: 500, msg: 'body is null' };
return;
}
await next();
});
this.addJobRoutes(router, 'koa');
app.use(bodyParser());
app.use(router.routes()).use(router.allowedMethods());
return app;
}
addJobRoutes(router, appType) {
const handleRoute = (path, handler) => {
if (appType === 'express') {
router.post(path, async (req, res) => {
const result = handler(req.body || {});
res.send(result);
});
}
else if (appType === 'koa') {
router.post(path, async (ctx) => {
const result = handler(ctx.request.body || {});
ctx.body = result;
});
}
};
handleRoute('/beat', () => this.beat());
handleRoute('/idleBeat', (body) => this.idleBeat(body.jobId || -1));
handleRoute('/run', (body) => this.run(body || {}));
handleRoute('/kill', (body) => this.killJob(body.jobId || -1));
handleRoute('/log', async (body) => {
const { logDateTim, logId, fromLineNum } = body || {};
return await this.readLog(logDateTim, logId, fromLineNum);
});
}
beat() {
return { code: 200, msg: 'success' };
}
idleBeat(jobId) {
return (this.jobManager.hasJob(jobId) ? { code: 500, msg: 'busy' } : { code: 200, msg: 'idle' });
}
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
const jobHandler = this.jobHandlers.get(handlerName);
if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` };
}
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
return { code: 200, msg: 'success' };
}
killJob(jobId) {
return { code: 500, msg: `not yet support, jobId(${jobId})` };
}
async readLog(logDateTime, logId, fromLineNum) {
return {
code: 200, content: {
"fromLineNum": 0,
"toLineNum": 100,
"logContent": "test",
"isEnd": true
}
};
}
async registry() {
return axios_1.default.post(`${this.scheduleCenterUrl}/api/registry`, {
registryGroup: 'EXECUTOR',
registryKey: this.executorKey,
registryValue: this.executorUrl
}, {
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => {
}).catch(error => console.error('执行器注册失败:', error.message));
}
async registryRemove() {
return axios_1.default.post(`${this.scheduleCenterUrl}/api/registryRemove`, {
'registryGroup': 'EXECUTOR',
'registryKey': this.executorKey,
'registryValue': this.executorUrl
}, {
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => console.log('执行器移除成功:', response.data)).catch(error => console.error('执行器移除失败:', error.message));
}
async callback({ error, logId, result, use_time }) {
var handleMsg = JSON.stringify({ use_time, result, error: error && error.message });
return axios_1.default.post(`${this.scheduleCenterUrl}/api/callback`, [{
logId,
logDateTim: Date.now(),
handleCode: error ? 500 : 200,
handleMsg
}], {
headers: {
'XXL-JOB-ACCESS-TOKEN': this.accessToken,
'Content-Type': 'application/json'
}
}).then((response) => response.data).catch((err) => {
console.error(`callback error:${JSON.stringify(result)} ${err.message}`);
});
}
}
exports.Executor = Executor;
//# sourceMappingURL=executor.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.createFrpc = createFrpc;
function createFrpc(configPath) {
const { spawn } = require("child_process");
const path = require('path');
console.log("启动 frpc ...");
const frpc = spawn(path.join(__dirname, '..', '..', 'frp', 'frpc'), ["-c", configPath], {
stdio: "inherit",
});
function cleanup() {
if (!frpc.killed) {
frpc.kill("SIGTERM");
}
}
process.on("exit", cleanup);
process.on("SIGINT", () => {
cleanup();
process.exit();
});
process.on("SIGTERM", () => {
cleanup();
process.exit();
});
frpc.on("exit", (code) => {
console.log(`frpc 已退出,代码: ${code}`);
});
}
//# sourceMappingURL=frpc.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.XxlJobExecutor = void 0;
const executor_1 = require("./executor");
class XxlJobExecutor {
executor;
registryInterval;
constructor(opts, jobHandlers) {
const { executorKey, scheduleCenterUrl, accessToken, jobLogPath, } = opts;
this.executor = new executor_1.Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers);
}
applyMiddleware({ appType, domain }) {
const app = this.executor.applyMiddleware({ appType, domain });
const registry = this.executor.registry.bind(this.executor);
registry() && (this.registryInterval = setInterval(registry, 30000));
return app;
}
async close() {
if (this.registryInterval) {
clearInterval(this.registryInterval);
}
return this.executor.registryRemove();
}
}
exports.XxlJobExecutor = XxlJobExecutor;
//# sourceMappingURL=index.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.JobManager = void 0;
class JobManager {
runningJobs;
jobLogPath;
constructor(jobLogPath) {
this.jobLogPath = jobLogPath;
this.runningJobs = new Set();
}
getLogFilePath(dateTime) {
}
getJobLoggerNamespace(handlerName, dateTime, logId) {
}
hasJob(jobId) {
return this.runningJobs.has(jobId);
}
async runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, callback) {
let result, timeoutTimer, error;
const startTime = Date.now();
try {
if (this.hasJob(jobId)) {
throw new Error('已有相同任务正在运行');
}
this.runningJobs.add(jobId);
let jobParams = {};
try {
jobParams = JSON.parse(jobJsonParams) || {};
}
catch { }
if (executorTimeout) {
timeoutTimer = setTimeout(() => this.finishJob({
jobId,
logId,
callback,
timeoutTimer: null,
use_time: Date.now() - startTime,
result: null,
error: new Error('任务执行超时')
}), executorTimeout * 1000);
}
result = await jobHandler(jobParams);
if (result && result.error) {
error = result.error;
delete result.error;
if (Object.keys(result).length === 0) {
result = undefined;
}
}
}
catch (err) {
error = err;
}
finally {
await this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: Date.now() - startTime,
result,
error
});
}
}
async readJobLog(logDateTime, logId) {
}
async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
try {
timeoutTimer && clearTimeout(timeoutTimer);
await callback({ error, logId, use_time, result });
}
catch (err) {
console.error(`完成任务时发生错误: ${err.message}`);
}
finally {
this.runningJobs.delete(jobId);
}
}
}
exports.JobManager = JobManager;
//# sourceMappingURL=job-manager.js.map
\ No newline at end of file \ No newline at end of file
let config;
try {
config = JSON.parse(process.env.CONFIG);
} catch (e) {
// appType: "express",
// appDomain: "http://pd-service-info-task:8088/",
// port: 8088,
// xxlJob: {
// executorKey: "hemera-job",
// scheduleCenterUrl: "http://10.0.4.5:8080/xxl-job-admin",
// accessToken: "default_token",
// jobLogPath: "logs/job",
// enableDebugLog: false,
// },
const path = require("path");
config = {
appType: "express",
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq", // 注:开发环境建议加后缀区分
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), "frp/frpc.toml"), //根目录下文件夹frp中的frpc.toml
};
}
module.exports = config;
此文件太大,无法显示。
{ {
"name": "job-executor", "name": "job-executor-hemera",
"version": "1.0.6", "version": "1.0.6",
"description": "xxl-job execution framework for nodejs", "description": "xxl-job execution framework for nodejs",
"main": "build/index.js", "main": "build/index.js",
...@@ -11,15 +11,15 @@ ...@@ -11,15 +11,15 @@
"author": "harvey", "author": "harvey",
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"axios": "^1.9.0" "axios": "^1.9.0",
"body-parser": "^2.2.0",
"express": "^5.1.0",
"job-executor": "git+http://git.ydniu.cn/zhanghw/job-executor.git#master",
"lodash": "^4.17.21",
"nats-hemera": "^7.0.2"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^22.4.1", "@types/node": "^22.4.1",
"body-parser": "^2.2.0",
"express": "^5.1.0",
"koa": "^2.4.1",
"koa-bodyparser": "^4.4.1",
"koa-router": "^13.0.1",
"typescript": "^5.8.3" "typescript": "^5.8.3"
} }
} }
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.act = exports.add = exports.init = void 0;
const _ = require("lodash");
const path = require("path");
const rp = require("request-promise");
const fastify = require("fastify");
const common_1 = require("../common");
const log_1 = require("../log");
const service_1 = require("../service");
let logger;
const api = {};
const fast = fastify({ trustProxy: true });
let targetPort = 0;
function init(cfg, callback) {
logger = new log_1.Log(`msbase_fastify`);
if (cfg.targetPort)
targetPort = cfg.targetPort;
const start = () => {
if (!cfg.port)
return;
try {
_.forEach(cfg.apipath ? cfg.apipath : ['./build/services/'], common_1.requiredir);
}
catch (error) {
logger.log(error.message);
}
logger.log('api initialized');
// 路由 及 熔断
let circuitBreakerOpt = { threshold: 3, timeout: 3000, resetTimeout: 3000 };
if (cfg.circuitBreaker)
circuitBreakerOpt = Object.assign(Object.assign({}, circuitBreakerOpt), cfg.circuitBreaker);
fast.register(require('fastify-circuit-breaker'), circuitBreakerOpt);
// 压缩
fast.register(require('fastify-compress'), { global: false });
try {
const route = cfg.routefile ? require(path.resolve(cfg.routefile)) : null;
if (route)
route(fast);
}
catch (error) {
logger.log(error.message);
}
_.keys(api).length > 0 && fast.register(function (instance, opts, next) {
instance.route({
method: 'POST',
url: '/:cmd/',
// beforeHandler: instance.circuitBreaker(),
handler: (request, reply) => __awaiter(this, void 0, void 0, function* () {
let payload = {};
const start = Date.now();
const name = request.params.cmd.toLowerCase();
const bodyJson = JSON.stringify(request.body);
logger.debug(`1-api-${name} args-${bodyJson}`, { method_name: name });
try {
const action = api[name];
if (!action)
throw new Error(`${name} does not exist in ${cfg.name}`);
payload.result = yield action(request.body);
}
catch (error) {
payload.error = error.message;
}
const hs = Date.now() - start;
let spend_time = Math.floor(hs / 50);
if (spend_time > 99)
spend_time = 99;
logger.debug(`2-api-${name} args-${bodyJson} result-${JSON.stringify(payload)} 耗时-${hs}`, { method_name: name, spend_time: spend_time });
// reply.send(payload)
reply.compress(payload);
})
});
console.log('start ms api route');
next();
});
fast.listen(cfg.port, '0.0.0.0', (err, address) => {
if (err)
throw err;
logger.info(`server listening on ${address}`);
});
};
callback ? callback(start) : start();
}
exports.init = init;
function add(args, callback) {
api[args.cmd] = callback;
console.log(`api '${args.cmd}' success`);
}
exports.add = add;
function act(args, isThrowError = false) {
return __awaiter(this, void 0, void 0, function* () {
if (!args.topic || !args.cmd)
throw new Error('topic or cmd is null');
if (args.topic)
args.topic = args.topic.toLowerCase();
if (args.cmd)
args.cmd = args.cmd.toLowerCase();
const args_obj = _.pick(args, ['data', 'context']);
const body = JSON.stringify(args_obj);
if (args.topic === service_1.config.name)
return api[args.cmd](Object.assign({}, args_obj));
const url = `http://service-${args.topic}${targetPort ? (':' + targetPort) : ''}/${args.cmd}/`;
const start = Date.now();
let ret = yield rp.post(url, { body, gzip: true, forever: true, headers: { 'Content-Type': 'application/json' } })
.then(body => JSON.parse(body))
.catch(e => { return { error: e.message }; });
const hs = Date.now() - start;
let spend_time = Math.floor(hs / 50);
if (spend_time > 99)
spend_time = 99;
const msg = `act--${url}|${body}|${JSON.stringify(ret)}|${hs}`;
const opts = { method_name: args.cmd, spend_time: spend_time };
logger.debug(msg, opts);
if (ret) {
if ('error' in ret) {
logger.error(msg, opts);
if (isThrowError)
throw new Error(ret.error);
}
if ('result' in ret) {
ret = ret.result;
}
}
return ret;
});
}
exports.act = act;
//# sourceMappingURL=fast.js.map
\ No newline at end of file \ No newline at end of file
import _ = require('lodash')
import * as path from 'path'
import * as rp from 'request-promise'
import * as fastify from 'fastify'
import { requiredir } from '../common'
import { Log } from '../log'
import { config } from '../service';
let logger: Log
const api = {}
const fast = fastify({ trustProxy: true })
let targetPort = 0
export function init(cfg: any, callback?: Function) {
logger = new Log(`msbase_fastify`)
if (cfg.targetPort) targetPort = cfg.targetPort
const start = () => {
if (!cfg.port) return
try {
_.forEach(cfg.apipath ? cfg.apipath : ['./build/services/'], requiredir)
} catch (error) { logger.log(error.message); }
logger.log('api initialized');
// 路由 及 熔断
let circuitBreakerOpt = { threshold: 3, timeout: 3000, resetTimeout: 3000 }
if (cfg.circuitBreaker) circuitBreakerOpt = { ...circuitBreakerOpt, ...cfg.circuitBreaker }
fast.register(require('fastify-circuit-breaker'), circuitBreakerOpt)
// 压缩
fast.register(require('fastify-compress'), { global: false });
try {
const route = cfg.routefile ? require(path.resolve(cfg.routefile)) : null;
if (route) route(fast)
} catch (error) { logger.log(error.message); }
_.keys(api).length > 0 && fast.register(function (instance: any, opts, next) {
instance.route({
method: 'POST',
url: '/:cmd/',
// beforeHandler: instance.circuitBreaker(),
handler: async (request, reply) => {
let payload: any = {}
const start = Date.now()
const name = request.params.cmd.toLowerCase()
const bodyJson = JSON.stringify(request.body)
logger.debug(`1-api-${name} args-${bodyJson}`, { method_name: name })
try {
const action = api[name]
if (!action) throw new Error(`${name} does not exist in ${cfg.name}`)
payload.result = await action(request.body);
} catch (error) {
payload.error = error.message
}
const hs = Date.now() - start
let spend_time = Math.floor(hs / 50)
if (spend_time > 99) spend_time = 99
logger.debug(`2-api-${name} args-${bodyJson} result-${JSON.stringify(payload)} 耗时-${hs}`, { method_name: name, spend_time: spend_time })
// reply.send(payload)
reply.compress(payload)
}
})
console.log('start ms api route')
next()
});
fast.listen(cfg.port, '0.0.0.0', (err, address) => {
if (err) throw err
logger.info(`server listening on ${address}`)
})
}
callback ? callback(start) : start()
}
interface Pattern {
pubsub$?: boolean
timeout$?: number
maxMessages$?: number
expectedMessages$?: number
[key: string]: any
}
export function add(args: Pattern, callback: Function) {
api[args.cmd] = callback
console.log(`api '${args.cmd}' success`)
}
export async function act(args: Pattern, isThrowError = false) {
if (!args.topic || !args.cmd) throw new Error('topic or cmd is null')
if (args.topic) args.topic = args.topic.toLowerCase()
if (args.cmd) args.cmd = args.cmd.toLowerCase()
const args_obj = _.pick(args, ['data', 'context']);
const body = JSON.stringify(args_obj)
if (args.topic === config.name) return api[args.cmd]({ ...args_obj });
const url = `http://service-${args.topic}${targetPort ? (':' + targetPort) : ''}/${args.cmd}/`
const start = Date.now()
let ret: any = await rp.post(url, { body, gzip: true, forever: true, headers: { 'Content-Type': 'application/json' } })
.then(body => JSON.parse(body))
.catch(e => { return { error: e.message } })
const hs = Date.now() - start
let spend_time = Math.floor(hs / 50)
if (spend_time > 99) spend_time = 99
const msg = `act--${url}|${body}|${JSON.stringify(ret)}|${hs}`
const opts = { method_name: args.cmd, spend_time: spend_time }
logger.debug(msg, opts)
if (ret) {
if ('error' in ret) {
logger.error(msg, opts)
if (isThrowError) throw new Error(ret.error)
}
if ('result' in ret) {
ret = ret.result
}
}
return ret;
}
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.act = exports.add = exports.init = void 0;
const fs = require("fs");
const _ = require("lodash");
const nats_1 = require("nats");
const path = require("path");
const Hemera = require('nats-hemera');
let hemera;
let logger;
function init(cfg, callback) {
logger = cfg.log ? new cfg.log(`${cfg.name}/hemera`) : console;
hemera = new Hemera((0, nats_1.connect)(cfg.nats), Object.assign({}, cfg.hemera));
logger.info('msbase hemera log init success');
try {
const HemeraZipkin = require('hemera-zipkin');
hemera.use(HemeraZipkin, {
host: process.env.ZIPKIN_URL,
port: process.env.ZIPKIN_PORT,
sampling: 1
});
}
catch (e) {
logger.info('zipkin未配置: ');
}
hemera.ready(() => __awaiter(this, void 0, void 0, function* () {
logger.log('hemera initialized');
callback(() => {
try {
_.forEach(cfg.apipath ? cfg.apipath : ['./build/services/'], requirefile);
}
catch (error) {
logger.log(error.message);
}
logger.log('api initialized');
});
}));
return hemera;
}
exports.init = init;
function requirefile(dir) {
fs.readdirSync(dir).forEach((file) => {
try {
console.info(`初始化微服务,扫描文件 ${path.resolve(dir + file)}`);
if (file.substr(-3) === '.js' && require(path.resolve(dir + file)))
return;
if (file.indexOf('.') === -1)
requirefile(`${dir}${file}/`);
}
catch (e) {
console.info(`微服务初始化异常 ${dir} ${file}`, e);
}
});
}
function add(args, callback) {
if (hemera) {
hemera.add(args, (req) => __awaiter(this, void 0, void 0, function* () { return callback(req).catch(e => { return { error: e.message }; }); }));
// hemera.add(args, async (req, cb) => {
// const ret = await callback(req);
// cb(null, ret);
// });
}
}
exports.add = add;
function act(args, isThrowError = false) {
return __awaiter(this, void 0, void 0, function* () {
const starttime = Date.now();
let localfn = hemera.router.lookup(args);
if (localfn)
localfn = localfn.action;
if (!args.context)
args.context = {};
let ret = yield (localfn ?
localfn(args)
: hemera.act(args)).catch((error) => ({ error: error.message, code: 1001 }));
if (ret && ret.context)
ret = ret.data;
if (ret && ret.error)
logger.error(ret.error);
if (isThrowError && ret && ret.error)
throw new Error(ret.error);
const total = Date.now() - starttime;
if (total > 200) {
// logger.log(`${JSON.stringify(args)} 调用时间耗时:${total}毫秒 ${JSON.stringify(ret)}`);
logger.log(`${JSON.stringify(args)} 调用时间耗时:${total}毫秒 `);
}
return ret;
});
}
exports.act = act;
//# sourceMappingURL=hemera.js.map
\ No newline at end of file \ No newline at end of file
"use strict";
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.act = exports.add = exports.init = void 0;
var adapter = require('./hemera');
var config = {};
function init(cfg, callback) {
switch (cfg.type) {
case 'fast':
adapter = require('./fast');
break;
default:
adapter = require('./hemera');
break;
}
config = cfg;
adapter.init(cfg, callback);
}
exports.init = init;
function add(args, callback) {
if (adapter) {
adapter.add(ckArgs(args), callback);
}
}
exports.add = add;
function act(args, isThrowError = false) {
return __awaiter(this, void 0, void 0, function* () {
return adapter.act(ckArgs(args), isThrowError);
});
}
exports.act = act;
function ckArgs(args) {
if (!args)
return;
if (!args.topic)
args.topic = config.name;
//if (!args.topic || !args.cmd) throw new Error('topic and cmd is null')
if (args.topic)
args.topic = args.topic.toLowerCase();
if (args.cmd)
args.cmd = args.cmd.toLowerCase();
return args;
}
//# sourceMappingURL=index.js.map
\ No newline at end of file \ No newline at end of file
var adapter: any = require('./hemera')
var config: any = {}
export function init(cfg: any, callback?: Function) {
switch (cfg.type) {
case 'fast':
adapter = require('./fast')
break;
default:
adapter = require('./hemera')
break;
}
config = cfg
adapter.init(cfg, callback)
}
interface Pattern {
pubsub$?: boolean
timeout$?: number
maxMessages$?: number
expectedMessages$?: number
[key: string]: any
}
export function add(args: Pattern, callback: Function) {
if (adapter) {
adapter.add(ckArgs(args), callback)
}
}
export async function act(args: Pattern, isThrowError = false) {
return adapter.act(ckArgs(args), isThrowError)
}
function ckArgs(args: Pattern) {
if (!args) return;
if (!args.topic) args.topic = config.name
//if (!args.topic || !args.cmd) throw new Error('topic and cmd is null')
if (args.topic) args.topic = args.topic.toLowerCase()
if (args.cmd) args.cmd = args.cmd.toLowerCase()
return args
}
import * as express from "express";
import { JobExecutor } from "./index";
import * as path from "path";
const config = {
appType:'express',
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq", // 注:开发环境建议加后缀区分
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml') //根目录下文件夹frp中的frpc.toml
};
var jobExecutor = new JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
})
//批量任务注册
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
}
]
]
jobExecutor.registerBatch(batchJob);
\ No newline at end of file \ No newline at end of file
import * as fs from 'fs';
import _ = require('lodash'); import _ = require('lodash');
import { connect } from 'nats'; import { connect } from 'nats';
import * as path from 'path';
const Hemera = require('nats-hemera'); const Hemera = require('nats-hemera');
let hemera; let hemera;
......
/** import { JobExecutor } from 'job-executor'
* XXL-JOB 执行器配置项
*/
interface XxlJobConfig {
/** 执行器唯一标识(建议开发环境用不同名称避免冲突) */
executorKey: string;
/** 调度中心地址 */
scheduleCenterUrl?: string;
/** 认证令牌 */
accessToken?: string;
/** 任务日志存储路径 */
jobLogPath?: string;
/** 是否启用调试日志 */
enableDebugLog?: boolean;
}
/** const config = require('../config');
* 执行器全局配置
*/
interface JobExecutorOptions {
/** Express/Koa 'express' | 'koa' */
appType: string;
/** 应用对外访问的域名(用于回调) */
appDomain: string;
/** 应用监听端口 */
port: number;
/** XXL-JOB 核心配置 */
xxlJob: XxlJobConfig;
/** frpc 配置文件路径(可选) */
frpcConfigPath?: string;
}
/** var jobExecutor = new JobExecutor(config);
* XXL-JOB 任务执行器
* 1. 注册任务处理器
* 2. 启动 HTTP 服务
* 3. 可选启动 frpc 内网穿透
*/
export class JobExecutor {
private jobHandlers = new Map<string, Function>();
constructor(opts: JobExecutorOptions) { jobExecutor.register("demoJobHandler", async (args) => {
// 初始化 frpc 内网穿透(如果配置存在)
if (opts.frpcConfigPath) {
require("./xxl-job/frpc").createFrpc(opts.frpcConfigPath);
}
// 连接 XXL-JOB 调度中心
const app = new (require("./xxl-job/index").XxlJobExecutor)(
opts.xxlJob,
this.jobHandlers
).applyMiddleware({
appType: opts.appType,
domain: opts.appDomain
});
// 启动 HTTP 服务 for (let i = 1; i < 10; i++) {
app.listen(opts.port, () => { await new Promise((resolve, reject) => {
console.log(`[XXL-JOB] Executor ${opts.appType} "${opts.xxlJob.executorKey}" is running`); setTimeout(resolve, 888);
console.log(`- Local: http://localhost:${opts.port}`); })
console.log(`- Public: ${opts.appDomain}`);
});
} }
/** return { result: 'return value' };
* 注册任务 })
* @param jobName 任务名称(需与调度中心配置一致)
* @param handler 任务处理函数,需返回 Promise
* @throws 如果任务已注册或 handler 不是函数
*/
public register(jobName: string, handler: Function) {
if (this.jobHandlers.has(jobName)) {
throw new Error(`Job handler "${jobName}" is already registered`);
}
if (typeof handler !== 'function') { //批量任务注册
throw new Error(`Job handler "${jobName}" must be a function`); const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
} }
console.log(`[XXL-JOB] Job "${jobName}" registered`); return { result: 'return value' };
// 包装 handler
this.jobHandlers.set(jobName, async (...args: any[]) => {
try {
return await handler(...args);
} catch (error) {
console.error(`[XXL-JOB] Job "${jobName}" failed:`, error);
return { error };
} }
}); ],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
} }
/** return { result: 'return value' };
* 批量注册任务
* @param handlers Array<[string, Function]> 任务处理函数,需返回 Promise
*
*/
public registerBatch(handlers: any[]) {
handlers.forEach((handler) => this.register(handler[0], handler[1]));
} }
]
]
/**
* 获取任务
* @param jobName 任务名称(需与调度中心配置一致)
*/
public getJobHandler(jobName: string) {
return this.jobHandlers.get(jobName);
}
}
\ No newline at end of file \ No newline at end of file
jobExecutor.registerBatch(batchJob);
\ No newline at end of file \ No newline at end of file
import { JobExecutor } from "./index";
import * as path from "path";
const config = {
appType: "koa",
appDomain: "http://10.0.1.92:17001/",
port: 8088,
xxlJob: {
executorKey: "job-ydn-zq", // 注:开发环境建议加后缀区分
scheduleCenterUrl: "http://xxljob.ydniu.com/xxl-job-admin",
accessToken: "default_token",
jobLogPath: "logs/job",
enableDebugLog: false,
},
frpcConfigPath: path.join(process.cwd(), 'frp/frpc.toml') //根目录下文件夹frp中的frpc.toml
};
var jobExecutor = new JobExecutor(config);
jobExecutor.register("demoJobHandler", async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
})
//批量任务注册
const batchJob = [
[
"demoJobHandler1",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
}
],
[
"demoJobHandler2",
async (args) => {
for (let i = 1; i < 10; i++) {
await new Promise((resolve, reject) => {
setTimeout(resolve, 888);
})
}
return { result: 'return value' };
}
]
]
jobExecutor.registerBatch(batchJob);
\ No newline at end of file \ No newline at end of file
export function createFrpc(configPath: string) {
const { spawn } = require("child_process")
const path = require('path')
console.log("启动 frpc ...")
// 1. 启动 frpc 进程
const frpc = spawn(path.join(__dirname, '..', '..', 'frp', 'frpc'), ["-c", configPath], {
stdio: "inherit", // 共享输入输出
});
// 2. 监听 Node.js 退出事件并关闭 frpc
function cleanup() {
if (!frpc.killed) {
frpc.kill("SIGTERM"); // 发送终止信号
}
}
// 捕获各种退出信号
process.on("exit", cleanup);
process.on("SIGINT", () => {
cleanup();
process.exit();
});
process.on("SIGTERM", () => {
cleanup();
process.exit();
});
// 3. 监听 frpc 的退出
frpc.on("exit", (code) => {
console.log(`frpc 已退出,代码: ${code}`);
});
}
import { Executor } from './executor';
/**
* XxlJobExecutor
*/
export class XxlJobExecutor {
private executor
private registryInterval
/**
* 创建 XxlJobExecutor 实例
* @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数
*/
constructor(opts, jobHandlers) {
const {
executorKey,
scheduleCenterUrl,
accessToken,
jobLogPath,
} = opts;
this.executor = new Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers)
}
/**
* 应用执行器组件
* @param {Object} args
* @param {any} args.appType 执行器server, express
* @param {string} args.appDomain 执行器 server 地址
*/
public applyMiddleware({ appType, domain }) {
const app = this.executor.applyMiddleware({ appType, domain })
const registry = this.executor.registry.bind(this.executor)
registry() && (this.registryInterval = setInterval(registry, 30000))
return app
}
/**
* 关闭服务前应调用该方法,将执行器从调度中心摘除
*/
async close() {
if (this.registryInterval) {
clearInterval(this.registryInterval)
}
return this.executor.registryRemove()
}
}
\ No newline at end of file \ No newline at end of file
/**
* 任务管理系统 - 用于管理并发任务,支持日志记录和超时控制
*/
export class JobManager {
private runningJobs; // 存储正在运行的任务ID集合
private jobLogPath; // 任务日志存储路径
/**
* 构造函数
* @param {string} jobLogPath - 任务日志存储路径
*/
constructor(jobLogPath) {
this.jobLogPath = jobLogPath;
this.runningJobs = new Set(); // 使用Set存储运行中任务ID,保证唯一性
}
/**
* 根据执行时间获取日志文件路径
* @param {number} dateTime - 任务执行时间戳
* @return {string} - 日志文件完整路径
*/
getLogFilePath(dateTime) {
// 待实现:根据时间戳生成日志文件路径
// 示例:按日期分目录存储
}
/**
* 生成任务日志器的命名空间
* @param {string} handlerName - 任务处理器名称
* @param {number} dateTime - 任务执行时间戳
* @param {number} logId - 日志唯一ID
* @return {string} - 日志器命名空间字符串
*/
getJobLoggerNamespace(handlerName, dateTime, logId) {
// 待实现:生成格式化的日志命名空间
// 示例:`job:${handlerName}:${dateTime}:${logId}`
}
/**
* 检查任务是否正在运行
* @param {number} jobId - 任务ID
* @return {boolean} - 是否存在该运行中任务
*/
hasJob(jobId) {
return this.runningJobs.has(jobId);
}
/**
* 执行任务
* @param {number} jobId - 任务ID
* @param {string} jobJsonParams - 任务参数(JSON字符串格式)
* @param {number} logId - 日志ID
* @param {number} logDateTime - 日志时间戳
* @param {number} executorTimeout - 执行超时时间(秒)
* @param {string} handlerName - 处理器名称
* @param {function} jobHandler - 任务处理函数
* @param {function} callback - 任务完成回调函数
*/
async runJob(
jobId,
jobJsonParams,
logId,
logDateTime,
executorTimeout,
handlerName,
jobHandler,
callback
) {
// 1. 初始化日志记录器(待实现)
// const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
// const logFilePath = this.getLogFilePath(logDateTime);
// const jobLogger = logger(loggerNamespace, logFilePath);
let result, timeoutTimer, error;
const startTime = Date.now();
try {
// 检查是否有重复任务
if (this.hasJob(jobId)) {
throw new Error('已有相同任务正在运行');
}
this.runningJobs.add(jobId);
// 解析任务参数
let jobParams = {};
try {
jobParams = JSON.parse(jobJsonParams) || {};
} catch { }
// 设置超时定时器
if (executorTimeout) {
timeoutTimer = setTimeout(
() => this.finishJob({
jobId,
logId,
callback,
timeoutTimer: null,
use_time: Date.now() - startTime,
result: null,
error: new Error('任务执行超时')
}),
executorTimeout * 1000
);
}
// 执行任务处理函数
result = await jobHandler(jobParams);
// 处理可能存在的错误结果
if (result && result.error) {
error = result.error;
delete result.error;
if (Object.keys(result).length === 0) {
result = undefined;
}
}
} catch (err) {
error = err;
} finally {
await this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: Date.now() - startTime,
result,
error
});
}
}
/**
* 读取任务日志
* @param {number} logDateTime - 日志时间戳
* @param {number} logId - 日志ID
* @return {Promise<Array>} - 日志内容数组
*/
async readJobLog(logDateTime, logId) {
// 待实现:读取指定日志文件内容
}
/**
* 完成任务处理
* @param {Object} params - 任务完成参数对象
* @param {number} params.jobId - 任务ID
* @param {number} params.logId - 日志ID
* @param {function} params.callback - 回调函数
* @param {number} params.timeoutTimer - 超时定时器
* @param {number} params.use_time - 任务耗时(毫秒)
* @param {*} params.result - 任务结果
* @param {Error} params.error - 任务错误
*/
async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
try {
// 清除超时定时器
timeoutTimer && clearTimeout(timeoutTimer);
// 执行回调
await callback({ error, logId, use_time, result });
} catch (err) {
console.error(`完成任务时发生错误: ${err.message}`);
} finally {
// 无论成功与否,都从运行集合中移除
this.runningJobs.delete(jobId);
}
}
}
\ No newline at end of file \ No newline at end of file
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!