Commit f1ce9450 Harvey

no message

1 个父辈 37bce485
...@@ -6,7 +6,7 @@ const index_1 = require("./job/index"); ...@@ -6,7 +6,7 @@ const index_1 = require("./job/index");
const index_2 = require("./xxl_job/index"); const index_2 = require("./xxl_job/index");
const app = express(); const app = express();
app.use(require('body-parser').json()); app.use(require('body-parser').json());
new index_2.XxlJobExecutor(config.xxl_job, index_1.job_handlers).applyMiddleware({ app, appDomain: config.app_domain, path: '' }); new index_2.XxlJobExecutor(config.xxl_job, index_1.job_handlers).applyMiddleware({ app, domain: config.app_domain, path: '' });
app.listen(config.port, () => { app.listen(config.port, () => {
console.log(`job-ydn-zq app listening on port ${config.port}`); console.log(`job-ydn-zq app listening on port ${config.port}`);
}); });
......
...@@ -111,12 +111,10 @@ class Executor { ...@@ -111,12 +111,10 @@ class Executor {
* @return {{code: number, msg: string}} * @return {{code: number, msg: string}}
*/ */
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) { run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
// check executorHandler
const jobHandler = this.jobHandlers.get(handlerName); const jobHandler = this.jobHandlers.get(handlerName);
if (!jobHandler) { if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` }; return { code: 500, msg: `no matched jobHandler(${handlerName})` };
} }
// execute job
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this)); this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this));
return { code: 200, msg: 'success' }; return { code: 200, msg: 'success' };
} }
...@@ -158,7 +156,7 @@ class Executor { ...@@ -158,7 +156,7 @@ class Executor {
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度 * 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/ */
async registry() { async registry() {
await axios_1.default.post(`${this.scheduleCenterUrl}/api/registry`, { return axios_1.default.post(`${this.scheduleCenterUrl}/api/registry`, {
registryGroup: 'EXECUTOR', registryGroup: 'EXECUTOR',
registryKey: this.executorKey, registryKey: this.executorKey,
registryValue: this.executorUrl registryValue: this.executorUrl
...@@ -173,7 +171,7 @@ class Executor { ...@@ -173,7 +171,7 @@ class Executor {
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行 * 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/ */
async registryRemove() { async registryRemove() {
await axios_1.default.post(`${this.scheduleCenterUrl}/api/registryRemove`, { return axios_1.default.post(`${this.scheduleCenterUrl}/api/registryRemove`, {
'registryGroup': 'EXECUTOR', 'registryGroup': 'EXECUTOR',
'registryKey': this.executorKey, 'registryKey': this.executorKey,
'registryValue': this.executorUrl 'registryValue': this.executorUrl
......
"use strict"; "use strict";
Object.defineProperty(exports, "__esModule", { value: true }); Object.defineProperty(exports, "__esModule", { value: true });
exports.XxlJobExecutor = void 0; exports.XxlJobExecutor = void 0;
const Executor = require("./executor"); const executor_1 = require("./executor");
const { isNilOrEmpty, notEmpty } = require('./src/purefuncs');
/** /**
* XxlJobExecutor * XxlJobExecutor
*/ */
class XxlJobExecutor { class XxlJobExecutor {
executor; executor;
registryInterval;
/** /**
* 创建 XxlJobExecutor 实例 * 创建 XxlJobExecutor 实例
* @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数 * @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数
* @param {Object} context 为所有任务执行函数指定公共的上下文对象,常见比如数据库实例 { database, redis } * @param {Object} context 为所有任务执行函数指定公共的上下文对象,常见比如数据库实例 { database, redis }
*/ */
constructor(opts, jobHandlers) { constructor(opts, jobHandlers, context = undefined) {
const { XXL_JOB_EXECUTOR_KEY: executorKey, XXL_JOB_SCHEDULE_CENTER_URL: scheduleCenterUrl, XXL_JOB_ACCESS_TOKEN: accessToken, XXL_JOB_JOB_LOG_PATH: jobLogPath, } = opts; const { XXL_JOB_EXECUTOR_KEY: executorKey, XXL_JOB_SCHEDULE_CENTER_URL: scheduleCenterUrl, XXL_JOB_ACCESS_TOKEN: accessToken, XXL_JOB_JOB_LOG_PATH: jobLogPath, } = opts;
const parameters = { executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers }; this.executor = new executor_1.Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context);
// const invalidParameters = Object.entries(parameters).filter(([, value]) => isNilOrEmpty(value))
// if (notEmpty(invalidParameters)) throw `invalid parameter: ${invalidParameters.map(([key]) => key).join(',')}`
this.executor = new Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers);
} }
/** /**
* 应用执行器组件 * 应用执行器组件
* @param {Object} args * @param {Object} args
* @param {any} args.app 执行器server, express * @param {any} args.app 执行器server, express
* @param {string} args.appDomain 执行器 server 地址,eg: http://server-api.com * @param {string} args.appDomain 执行器 server 地址
* @param {string} args.path 执行器挂载的 uri 路径,eg: /job * @param {string} args.path 执行器挂载的 uri 路径
*/ */
applyMiddleware({ app, domain, path: uri }) { applyMiddleware({ app, domain, path }) {
this.executor.applyMiddleware({ app, domain, uri }); this.executor.applyMiddleware({ app, domain, path });
const registry = this.executor.registry.bind(this.executor); const registry = this.executor.registry.bind(this.executor);
registry() && setInterval(registry, 30000); registry() && (this.registryInterval = setInterval(registry, 30000));
} }
/** /**
* 关闭服务前应调用该方法,将执行器从调度中心摘除 * 关闭服务前应调用该方法,将执行器从调度中心摘除
*/ */
async close() { async close() {
await this.executor.registryRemove(); if (this.registryInterval) {
clearInterval(this.registryInterval);
}
return this.executor.registryRemove();
} }
} }
exports.XxlJobExecutor = XxlJobExecutor; exports.XxlJobExecutor = XxlJobExecutor;
......
...@@ -7,7 +7,7 @@ import { XxlJobExecutor } from './xxl_job/index' ...@@ -7,7 +7,7 @@ import { XxlJobExecutor } from './xxl_job/index'
const app = express() const app = express()
app.use(require('body-parser').json()) app.use(require('body-parser').json())
new XxlJobExecutor(config.xxl_job, job_handlers).applyMiddleware({ app, appDomain: config.app_domain, path: '' }) new XxlJobExecutor(config.xxl_job, job_handlers).applyMiddleware({ app, domain: config.app_domain, path: '' })
app.listen(config.port, () => { app.listen(config.port, () => {
......
...@@ -6,12 +6,12 @@ import axios from 'axios'; ...@@ -6,12 +6,12 @@ import axios from 'axios';
export class Executor { export class Executor {
executorKey private executorKey
scheduleCenterUrl private scheduleCenterUrl
accessToken private accessToken
executorUrl private executorUrl
jobHandlers private jobHandlers
jobManager private jobManager
/** /**
* @param {string} executorKey * @param {string} executorKey
...@@ -131,12 +131,12 @@ export class Executor { ...@@ -131,12 +131,12 @@ export class Executor {
* @return {{code: number, msg: string}} * @return {{code: number, msg: string}}
*/ */
run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) { run({ jobId, executorHandler: handlerName, executorParams: jobJsonParams, executorTimeout, logId, logDateTime }) {
// check executorHandler
const jobHandler = this.jobHandlers.get(handlerName) const jobHandler = this.jobHandlers.get(handlerName)
if (!jobHandler) { if (!jobHandler) {
return { code: 500, msg: `no matched jobHandler(${handlerName})` } return { code: 500, msg: `no matched jobHandler(${handlerName})` }
} }
// execute job
this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this)) this.jobManager.runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, this.callback.bind(this))
return { code: 200, msg: 'success' } return { code: 200, msg: 'success' }
...@@ -180,7 +180,7 @@ export class Executor { ...@@ -180,7 +180,7 @@ export class Executor {
* 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度 * 执行器注册:执行器注册时使用,调度中心会实时感知注册成功的执行器并发起任务调度
*/ */
async registry() { async registry() {
await axios.post(`${this.scheduleCenterUrl}/api/registry`, return axios.post(`${this.scheduleCenterUrl}/api/registry`,
{ {
registryGroup: 'EXECUTOR', registryGroup: 'EXECUTOR',
registryKey: this.executorKey, registryKey: this.executorKey,
...@@ -198,7 +198,7 @@ export class Executor { ...@@ -198,7 +198,7 @@ export class Executor {
* 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行 * 执行器注册摘除:执行器注册摘除时使用,注册摘除后的执行器不参与任务调度与执行
*/ */
async registryRemove() { async registryRemove() {
await axios.post(`${this.scheduleCenterUrl}/api/registryRemove`, return axios.post(`${this.scheduleCenterUrl}/api/registryRemove`,
{ {
'registryGroup': 'EXECUTOR', 'registryGroup': 'EXECUTOR',
'registryKey': this.executorKey, 'registryKey': this.executorKey,
......
import * as Executor from './executor'; import { Executor } from './executor';
const { isNilOrEmpty, notEmpty } = require('./src/purefuncs')
/** /**
* XxlJobExecutor * XxlJobExecutor
*/ */
export class XxlJobExecutor { export class XxlJobExecutor {
executor; private executor
private registryInterval
/** /**
* 创建 XxlJobExecutor 实例 * 创建 XxlJobExecutor 实例
* @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数 * @param {Map<String, Function>} jobHandlers 所有的任务执行函数,key: 任务标识,即调度中心任务配置的JobHandler;value: 任务执行函数
* @param {Object} context 为所有任务执行函数指定公共的上下文对象,常见比如数据库实例 { database, redis } * @param {Object} context 为所有任务执行函数指定公共的上下文对象,常见比如数据库实例 { database, redis }
*/ */
constructor(opts, jobHandlers) { constructor(opts, jobHandlers, context = undefined) {
const { const {
XXL_JOB_EXECUTOR_KEY: executorKey, XXL_JOB_EXECUTOR_KEY: executorKey,
XXL_JOB_SCHEDULE_CENTER_URL: scheduleCenterUrl, XXL_JOB_SCHEDULE_CENTER_URL: scheduleCenterUrl,
XXL_JOB_ACCESS_TOKEN: accessToken, XXL_JOB_ACCESS_TOKEN: accessToken,
XXL_JOB_JOB_LOG_PATH: jobLogPath, XXL_JOB_JOB_LOG_PATH: jobLogPath,
} = opts; } = opts;
const parameters = { executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers }
// const invalidParameters = Object.entries(parameters).filter(([, value]) => isNilOrEmpty(value)) this.executor = new Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers, context)
// if (notEmpty(invalidParameters)) throw `invalid parameter: ${invalidParameters.map(([key]) => key).join(',')}`
this.executor = new Executor(executorKey, scheduleCenterUrl, accessToken, jobLogPath, jobHandlers)
} }
/** /**
* 应用执行器组件 * 应用执行器组件
* @param {Object} args * @param {Object} args
* @param {any} args.app 执行器server, express * @param {any} args.app 执行器server, express
* @param {string} args.appDomain 执行器 server 地址,eg: http://server-api.com * @param {string} args.appDomain 执行器 server 地址
* @param {string} args.path 执行器挂载的 uri 路径,eg: /job * @param {string} args.path 执行器挂载的 uri 路径
*/ */
applyMiddleware({ app, domain, path: uri }) { public applyMiddleware({ app, domain, path }) {
this.executor.applyMiddleware({ app, domain, uri }) this.executor.applyMiddleware({ app, domain, path })
const registry = this.executor.registry.bind(this.executor) const registry = this.executor.registry.bind(this.executor)
registry() && setInterval(registry, 30000) registry() && (this.registryInterval = setInterval(registry, 30000))
} }
/** /**
* 关闭服务前应调用该方法,将执行器从调度中心摘除 * 关闭服务前应调用该方法,将执行器从调度中心摘除
*/ */
async close() { async close() {
await this.executor.registryRemove() if (this.registryInterval) {
clearInterval(this.registryInterval)
}
return this.executor.registryRemove()
} }
} }
\ No newline at end of file \ No newline at end of file
const Axios = require('axios')
const FC = require('folktale/concurrency')
const R = require('ramda')
const always = R.always
const anyPass = R.anyPass
const last = R.last
const compose = R.compose
const pick = R.pick
const propOr = R.propOr
const path = R.path
const pathOr = R.pathOr
const reject = R.reject
const tap = R.tap
const not = R.not
const isNil = R.isNil
const isEmpty = R.isEmpty
const Task = FC.task
const notEmpty = compose(not, isEmpty)
const omitNil = reject(isNil)
const isNilOrEmpty = anyPass([isNil, isEmpty])
const tapTask = (f) => compose(Task.of, tap(f))
const postTask = Task.fromPromised(Axios.post)
module.exports = {
always, last, compose, pick, propOr, path, pathOr, tap, not, isNil, isEmpty, Task, notEmpty, omitNil,
isNilOrEmpty, tapTask, postTask,
}
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!