Skip to content
切换导航条
切换导航条
当前项目
正在载入...
登录
Harvey
/
job-executor
转到一个项目
切换导航栏
切换导航栏固定状态
项目
群组
代码片段
帮助
项目
活动
版本库
流水线
图表
问题
0
合并请求
0
维基
网络
创建新的问题
作业
提交
问题看板
文件
提交
网络
比较
分支
标签
Commit ee207e6a
由
Harvey
编写于
2025-04-28 17:04:22 +0800
浏览文件
选项
浏览文件
标签
下载
电子邮件补丁
差异文件
no message
1 个父辈
aa2bfd71
隐藏空白字符变更
内嵌
并排
正在显示
10 个修改的文件
包含
436 行增加
和
365 行删除
build/job/index.js
build/xxl_job/executor.js
build/xxl_job/job-manager.js → build/xxl_job/job_manager.js
build/xxl_job/logger.js
build/xxl_job/purefuncs.js
package.json
src/job/index.ts
src/xxl_job/executor.ts
src/xxl_job/job-manager.ts → src/xxl_job/job_manager.ts
src/xxl_job/logger.ts
build/job/index.js
查看文件 @
ee207e6
...
...
@@ -3,4 +3,13 @@ Object.defineProperty(exports, "__esModule", { value: true });
exports
.
job_handlers
=
void
0
;
const
job_handlers
=
new
Map
();
exports
.
job_handlers
=
job_handlers
;
const
common_1
=
require
(
"../libs/common"
);
async
function
demoJobHandler
(
jobLogger
,
jobParams
,
context
)
{
jobLogger
.
debug
(
'params: %o, context: %o'
,
jobParams
,
context
);
for
(
let
i
=
1
;
i
<
10
;
i
++
)
{
await
(
0
,
common_1
.
sleep
)(
1000
);
jobLogger
.
debug
(
`
${
i
}
s passed`
);
}
}
job_handlers
.
set
(
'demoJobHandler'
,
demoJobHandler
);
//# sourceMappingURL=index.js.map
\ No newline at end of file
build/xxl_job/executor.js
查看文件 @
ee207e6
"use strict"
;
Object
.
defineProperty
(
exports
,
"__esModule"
,
{
value
:
true
});
exports
.
Executor
=
void
0
;
const
JobManager
=
require
(
'./job-manager'
);
const
logger
=
require
(
'./logger'
);
const
log
=
logger
(
'xxl-job-executor'
);
const
job_manager_1
=
require
(
"./job_manager"
);
// const logger = require('./logger')
// const log = logger('xxl-job-executor')
const
axios_1
=
require
(
"axios"
);
class
Executor
{
executorKey
;
...
...
@@ -25,7 +25,7 @@ class Executor {
this
.
scheduleCenterUrl
=
scheduleCenterUrl
;
this
.
accessToken
=
accessToken
;
this
.
jobHandlers
=
jobHandlers
;
this
.
jobManager
=
new
JobManager
(
jobLogPath
,
context
);
this
.
jobManager
=
new
job_manager_1
.
JobManager
(
jobLogPath
,
context
);
}
/**
* 应用执行器中间件
...
...
@@ -223,7 +223,7 @@ class Executor {
'Content-Type'
:
'application/json'
}
}).
then
((
response
)
=>
response
.
data
).
catch
((
err
)
=>
{
log
.
error
(
`callback error:
${
JSON
.
stringify
(
result
)}
${
err
.
message
}
`
);
console
.
error
(
`callback error:
${
JSON
.
stringify
(
result
)}
${
err
.
message
}
`
);
});
}
}
...
...
build/xxl_job/job
-
manager.js
→
build/xxl_job/job
_
manager.js
查看文件 @
ee207e6
const
fs
=
require
(
'fs'
);
const
moment
=
require
(
'moment'
);
const
Path
=
require
(
'path'
);
const
logger
=
require
(
'./logger'
);
const
{
Task
,
tapTask
}
=
require
(
'./purefuncs'
);
const
{
mkdir
,
searchInFile
}
=
require
(
'./file'
);
const
log
=
logger
(
'xxl-job-manager'
);
"use strict"
;
Object
.
defineProperty
(
exports
,
"__esModule"
,
{
value
:
true
});
exports
.
JobManager
=
void
0
;
const
moment
=
require
(
"moment"
);
const
Path
=
require
(
"path"
);
// const Path = require('path')
// const logger = require('./logger')
// const log = logger('xxl-job-manager')
// const { Task, tapTask } = require('./purefuncs')
// const { mkdir, searchInFile } = require('./file')
/**
* 任务管理
*/
class
JobManager
{
context
;
runningJobs
;
jobLogPath
;
/**
* @param {string} jobLogPath
* @param {*} context
*/
constructor
(
jobLogPath
,
context
)
{
mkdir
(
jobLogPath
);
// mkdir(jobLogPath)
this
.
jobLogPath
=
jobLogPath
;
this
.
context
=
context
;
this
.
runningJobs
=
new
Set
();
...
...
@@ -44,39 +50,55 @@ class JobManager {
hasJob
(
jobId
)
{
return
this
.
runningJobs
.
has
(
jobId
);
}
/**
* @param {number} jobId
* @param {string} jobJsonParams
* @param {number} logId
* @param {number} logDateTime
* @param {number} executorTimeout
* @param {string} handlerName
* @param {function} jobHandler
* @param {function} callback
*/
runJob
(
jobId
,
jobJsonParams
,
logId
,
logDateTime
,
executorTimeout
,
handlerName
,
jobHandler
,
callback
)
{
let
timeout
=
undefined
;
const
logNameSpace
=
this
.
getJobLoggerNamespace
(
handlerName
,
logDateTime
,
logId
);
const
logFilePath
=
this
.
getLogFilePath
(
logDateTime
);
const
jobLogger
=
logger
(
logNameSpace
,
logFilePath
);
Task
.
of
(
jobJsonParams
)
.
chain
((
jobJsonParams
)
=>
Task
.
of
(
jobJsonParams
?
JSON
.
parse
(
jobJsonParams
)
:
{}))
.
chain
((
jobParams
)
=>
{
jobLogger
.
trace
(
'start'
);
// check duplicate job
if
(
this
.
hasJob
(
jobId
))
return
Task
.
rejected
(
'There is already have a same job is running'
);
async
runJob
(
jobId
,
// 任务ID
jobJsonParams
,
// 任务参数(JSON格式)
logId
,
// 日志ID
logDateTime
,
// 日志时间
executorTimeout
,
// 执行超时时间(秒)
handlerName
,
// 处理器名称
jobHandler
,
// 任务处理函数
callback
// 回调函数
)
{
// 1. 初始化日志记录器
// const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
// const logFilePath = this.getLogFilePath(logDateTime);
// const jobLogger = logger(loggerNamespace, logFilePath);
let
result
,
timeoutTimer
,
error
;
try
{
if
(
this
.
hasJob
(
jobId
))
{
throw
new
Error
(
'已有相同任务正在运行'
);
}
this
.
runningJobs
.
add
(
jobId
);
// setup timeout
let
jobParams
;
try
{
jobParams
=
JSON
.
parse
(
jobJsonParams
);
}
catch
{
jobJsonParams
=
{};
}
if
(
executorTimeout
)
{
timeout
=
setTimeout
(
async
()
=>
await
this
.
finishJob
({
jobId
,
logId
,
jobLogger
,
callback
,
timeout
,
error
:
new
Error
(
'timeout'
)
}),
executorTimeout
*
1000
);
timeoutTimer
=
setTimeout
(()
=>
this
.
finishJob
({
jobId
,
logId
,
callback
,
timeoutTimer
,
result
:
null
,
error
:
new
Error
(
'任务执行超时'
)
}),
executorTimeout
*
1000
);
}
return
Task
.
fromPromised
(
jobHandler
)(
jobLogger
,
jobParams
,
this
.
context
);
})
.
chain
((
result
)
=>
Task
.
of
({
result
}))
.
orElse
((
error
)
=>
Task
.
of
({
error
}))
.
chain
(
tapTask
(
async
({
result
,
error
})
=>
await
this
.
finishJob
({
jobId
,
logId
,
jobLogger
,
callback
,
timeout
,
result
,
error
})))
.
run
().
promise
();
result
=
await
jobHandler
(
jobParams
,
this
.
context
);
}
catch
(
error
)
{
error
=
error
;
}
await
this
.
finishJob
({
jobId
,
logId
,
callback
,
timeoutTimer
,
result
,
error
});
}
/**
* @param {number} logDateTime
...
...
@@ -98,20 +120,18 @@ class JobManager {
* @param {*} error
* @return {Promise<void>}
*/
async
finishJob
({
jobId
,
logId
,
jobLogger
,
callback
,
timeout
,
result
,
error
})
{
async
finishJob
({
jobId
,
logId
,
callback
,
timeoutTimer
,
result
,
error
})
{
try
{
timeout
&&
clearTimeout
(
timeout
);
result
&&
jobLogger
.
trace
(
'result: %o'
,
result
);
error
&&
jobLogger
.
err
(
'error: %o'
,
error
.
message
||
error
);
jobLogger
.
trace
(
'end'
);
jobLogger
.
close
();
timeoutTimer
&&
clearTimeout
(
timeoutTimer
);
await
callback
(
error
,
{
logId
,
result
});
}
catch
(
err
)
{
log
.
err
(
'finishJob error: %o'
,
err
.
message
||
err
);
console
.
log
(
`finishJob error:
${
err
.
message
}
`
);
}
finally
{
this
.
runningJobs
.
delete
(
jobId
);
}
this
.
runningJobs
.
delete
(
jobId
);
}
}
module
.
exports
=
JobManager
;
//# sourceMappingURL=job-manager.js.map
\ No newline at end of file
exports
.
JobManager
=
JobManager
;
//# sourceMappingURL=job_manager.js.map
\ No newline at end of file
build/xxl_job/logger.js
查看文件 @
ee207e6
"use strict"
;
Object
.
defineProperty
(
exports
,
"__esModule"
,
{
value
:
true
});
exports
.
createLogger
=
void
0
;
const
debug_1
=
require
(
"debug"
);
const
fs_1
=
require
(
"fs"
);
const
os_1
=
require
(
"os"
);
const
util_1
=
require
(
"util"
);
// 配置解析
const
parseEnvBool
=
(
envVar
)
=>
{
const
value
=
process
.
env
[
envVar
]
||
''
;
return
/^
(
yes|on|true|enable|enabled|1
)
$/i
.
test
(
value
);
};
const
getEnvValue
=
(
envVar
,
defaultValue
)
=>
{
return
process
.
env
[
envVar
]
!==
undefined
?
process
.
env
[
envVar
]
:
defaultValue
;
};
// 默认配置
const
DEFAULT_LOG_LEVELS
=
'info:*,warn:*,error:*,debug:*,trace:*'
;
const
WRITE_STREAM_OPTIONS
=
{
flags
:
'a'
,
encoding
:
'utf8'
,
autoClose
:
true
,
emitClose
:
true
};
// 空日志函数
const
noop
=
(...
_args
)
=>
{
};
const
noopLogger
=
{
error
:
noop
,
info
:
noop
,
warn
:
noop
,
debug
:
noop
,
trace
:
noop
,
close
:
noop
};
// 创建基础日志函数
const
createBaseLoggers
=
(
namespace
)
=>
{
return
{
error
:
(
0
,
debug_1
.
default
)(
'error'
).
extend
(
namespace
),
info
:
(
0
,
debug_1
.
default
)(
'info'
).
extend
(
namespace
),
warn
:
(
0
,
debug_1
.
default
)(
'warn'
).
extend
(
namespace
),
debug
:
(
0
,
debug_1
.
default
)(
'debug'
).
extend
(
namespace
),
trace
:
(
0
,
debug_1
.
default
)(
'trace'
).
extend
(
namespace
)
};
};
// 配置日志级别
const
configureLogLevels
=
(
logger
,
enabledLevels
)
=>
{
Object
.
entries
(
logger
).
forEach
(([
level
])
=>
{
logger
[
level
].
enabled
=
enabledLevels
.
includes
(
level
);
});
};
// 创建文件日志写入器
const
createFileLogger
=
(
logFilePath
,
logger
,
enabledLevels
)
=>
{
const
writeStream
=
fs_1
.
default
.
createWriteStream
(
logFilePath
,
WRITE_STREAM_OPTIONS
);
const
logToFile
=
(...
args
)
=>
{
writeStream
.
write
(
`
${
util_1
.
default
.
format
(...
args
)}${
os_1
.
default
.
EOL
}
`
);
};
const
logToConsoleAndFile
=
(...
args
)
=>
{
const
message
=
util_1
.
default
.
format
(...
args
);
console
.
error
(
message
);
writeStream
.
write
(
`
${
message
}${
os_1
.
default
.
EOL
}
`
);
};
// 配置日志输出方式
Object
.
entries
(
logger
).
forEach
(([
level
,
logFn
])
=>
{
logFn
.
log
=
enabledLevels
.
includes
(
level
)
?
logToConsoleAndFile
:
logToFile
;
});
// 返回完整logger对象
return
{
...
logger
,
close
:
()
=>
{
writeStream
.
end
();
// 切换回仅控制台日志
Object
.
values
(
logger
).
forEach
((
logFn
)
=>
{
logFn
.
log
=
console
.
error
;
});
}
};
};
// 主导出函数
const
createLogger
=
(
options
)
=>
{
// 处理参数重载
const
normalizedOptions
=
typeof
options
===
'string'
?
{
namespace
:
options
}
:
options
;
const
{
namespace
,
logFilePath
,
enabledLevels
=
getEnvValue
(
'DEBUG'
,
DEFAULT_LOG_LEVELS
),
debugEnabled
=
parseEnvBool
(
'XXL_JOB_DEBUG_LOG'
)
}
=
normalizedOptions
;
if
(
!
debugEnabled
&&
!
logFilePath
)
{
return
noopLogger
;
}
const
logger
=
createBaseLoggers
(
namespace
);
// 配置日志级别
configureLogLevels
(
logger
,
enabledLevels
);
if
(
!
logFilePath
)
{
return
{
...
logger
,
close
:
noop
};
}
// 文件日志配置
return
createFileLogger
(
logFilePath
,
logger
,
enabledLevels
);
};
exports
.
createLogger
=
createLogger
;
// 默认导出
exports
.
default
=
exports
.
createLogger
;
// import debug from 'debug';
// import fs from 'fs';
// import os from 'os';
// import util from 'util';
// // 类型定义
// type LogLevel = 'error' | 'info' | 'warn' | 'debug' | 'trace';
// type LoggerMethods = {
// [key in LogLevel]: debug.Debugger;
// } & {
// close: () => void;
// };
// interface LoggerOptions {
// namespace: string;
// logFilePath?: string;
// enabledLevels?: string;
// debugEnabled?: boolean;
// }
// // 配置解析
// const parseEnvBool = (envVar: string): boolean => {
// const value = process.env[envVar] || '';
// return /^(yes|on|true|enable|enabled|1)$/i.test(value);
// };
// const getEnvValue = (envVar: string, defaultValue: string): string => {
// return process.env[envVar] !== undefined ? process.env[envVar] : defaultValue;
// };
// // 默认配置
// const DEFAULT_LOG_LEVELS = 'info:*,warn:*,error:*,debug:*,trace:*';
// const WRITE_STREAM_OPTIONS: fs.WriteStreamOptions = {
// flags: 'a',
// encoding: 'utf8',
// autoClose: true,
// emitClose: true
// };
// // 空日志函数
// const noop = (..._args: any[]): void => {};
// const noopLogger: LoggerMethods = {
// error: noop,
// info: noop,
// warn: noop,
// debug: noop,
// trace: noop,
// close: noop
// };
// // 创建基础日志函数
// const createBaseLoggers = (namespace: string): Omit<LoggerMethods, 'close'> => {
// return {
// error: debug('error').extend(namespace),
// info: debug('info').extend(namespace),
// warn: debug('warn').extend(namespace),
// debug: debug('debug').extend(namespace),
// trace: debug('trace').extend(namespace)
// };
// };
// // 配置日志级别
// const configureLogLevels = (
// logger: Omit<LoggerMethods, 'close'>,
// enabledLevels: string
// ): void => {
// Object.entries(logger).forEach(([level]) => {
// logger[level as LogLevel].enabled = enabledLevels.includes(level);
// });
// };
// // 创建文件日志写入器
// const createFileLogger = (
// logFilePath: string,
// logger: Omit<LoggerMethods, 'close'>,
// enabledLevels: string
// ): LoggerMethods => {
// const writeStream = fs.createWriteStream(logFilePath, WRITE_STREAM_OPTIONS);
// const logToFile = (...args: any[]): void => {
// writeStream.write(`${util.format(...args)}${os.EOL}`);
// };
// const logToConsoleAndFile = (...args: any[]): void => {
// const message = util.format(...args);
// console.error(message);
// writeStream.write(`${message}${os.EOL}`);
// };
// // 配置日志输出方式
// Object.entries(logger).forEach(([level, logFn]) => {
// logFn.log = enabledLevels.includes(level) ? logToConsoleAndFile : logToFile;
// });
// // 返回完整logger对象
// return {
// ...logger,
// close: (): void => {
// writeStream.end();
// // 切换回仅控制台日志
// Object.values(logger).forEach((logFn) => {
// logFn.log = console.error;
// });
// }
// };
// };
// // 主导出函数
// export const createLogger = (
// options: LoggerOptions | string
// ): LoggerMethods => {
// // 处理参数重载
// const normalizedOptions = typeof options === 'string'
// ? { namespace: options }
// : options;
// const {
// namespace,
// logFilePath,
// enabledLevels = getEnvValue('DEBUG', DEFAULT_LOG_LEVELS),
// debugEnabled = parseEnvBool('XXL_JOB_DEBUG_LOG')
// } = normalizedOptions;
// if (!debugEnabled && !logFilePath) {
// return noopLogger;
// }
// const logger = createBaseLoggers(namespace);
// // 配置日志级别
// configureLogLevels(logger, enabledLevels);
// if (!logFilePath) {
// return {
// ...logger,
// close: noop
// };
// }
// // 文件日志配置
// return createFileLogger(logFilePath, logger, enabledLevels);
// };
// // 默认导出
// export default createLogger;
//# sourceMappingURL=logger.js.map
\ No newline at end of file
build/xxl_job/purefuncs.js
deleted
100644 → 0
查看文件 @
aa2bfd7
const
Axios
=
require
(
'axios'
);
const
FC
=
require
(
'folktale/concurrency'
);
const
R
=
require
(
'ramda'
);
const
always
=
R
.
always
;
const
anyPass
=
R
.
anyPass
;
const
last
=
R
.
last
;
const
compose
=
R
.
compose
;
const
pick
=
R
.
pick
;
const
propOr
=
R
.
propOr
;
const
path
=
R
.
path
;
const
pathOr
=
R
.
pathOr
;
const
reject
=
R
.
reject
;
const
tap
=
R
.
tap
;
const
not
=
R
.
not
;
const
isNil
=
R
.
isNil
;
const
isEmpty
=
R
.
isEmpty
;
const
Task
=
FC
.
task
;
const
notEmpty
=
compose
(
not
,
isEmpty
);
const
omitNil
=
reject
(
isNil
);
const
isNilOrEmpty
=
anyPass
([
isNil
,
isEmpty
]);
const
tapTask
=
(
f
)
=>
compose
(
Task
.
of
,
tap
(
f
));
const
postTask
=
Task
.
fromPromised
(
Axios
.
post
);
module
.
exports
=
{
always
,
last
,
compose
,
pick
,
propOr
,
path
,
pathOr
,
tap
,
not
,
isNil
,
isEmpty
,
Task
,
notEmpty
,
omitNil
,
isNilOrEmpty
,
tapTask
,
postTask
,
};
//# sourceMappingURL=purefuncs.js.map
\ No newline at end of file
package.json
查看文件 @
ee207e6
...
...
@@ -13,7 +13,8 @@
"dependencies"
:
{
"axios"
:
"^1.9.0"
,
"body-parser"
:
"^2.2.0"
,
"express"
:
"^5.1.0"
"express"
:
"^5.1.0"
,
"moment"
:
"^2.30.1"
},
"devDependencies"
:
{
"@types/node"
:
"^10.17.60"
...
...
src/job/index.ts
查看文件 @
ee207e6
const
job_handlers
=
new
Map
();
import
{
sleep
}
from
'../libs/common'
async
function
demoJobHandler
(
jobLogger
,
jobParams
,
context
)
{
jobLogger
.
debug
(
'params: %o, context: %o'
,
jobParams
,
context
)
for
(
let
i
=
1
;
i
<
10
;
i
++
)
{
await
sleep
(
1000
)
jobLogger
.
debug
(
`
${
i
}
s passed`
)
}
}
job_handlers
.
set
(
'demoJobHandler'
,
demoJobHandler
)
export
{
job_handlers
}
\ No newline at end of file
src/xxl_job/executor.ts
查看文件 @
ee207e6
const
JobManager
=
require
(
'./job-manager'
)
const
logger
=
require
(
'./logger'
)
const
log
=
logger
(
'xxl-job-executor'
)
import
{
JobManager
}
from
'./job_manager'
// const logger = require('./logger')
// const log = logger('xxl-job-executor')
import
axios
from
'axios'
;
export
class
Executor
{
...
...
@@ -262,7 +263,7 @@ export class Executor {
'Content-Type'
:
'application/json'
}
}).
then
((
response
)
=>
response
.
data
).
catch
((
err
)
=>
{
log
.
error
(
`callback error:
${
JSON
.
stringify
(
result
)}
${
err
.
message
}
`
);
console
.
error
(
`callback error:
${
JSON
.
stringify
(
result
)}
${
err
.
message
}
`
);
})
}
}
src/xxl_job/job
-
manager.ts
→
src/xxl_job/job
_
manager.ts
查看文件 @
ee207e6
const
fs
=
require
(
'fs'
)
const
moment
=
require
(
'moment'
)
const
Path
=
require
(
'path'
)
const
logger
=
require
(
'./logger'
)
const
{
Task
,
tapTask
}
=
require
(
'./purefuncs'
)
const
{
mkdir
,
searchInFile
}
=
require
(
'./file'
)
import
*
as
moment
from
'moment'
import
*
as
Path
from
'path'
// const Path = require('path')
// const logger = require('./logger')
// const log = logger('xxl-job-manager')
// const { Task, tapTask } = require('./purefuncs')
// const { mkdir, searchInFile } = require('./file')
const
log
=
logger
(
'xxl-job-manager'
)
/**
* 任务管理
*/
class
JobManager
{
export
class
JobManager
{
context
runningJobs
jobLogPath
/**
* @param {string} jobLogPath
* @param {*} context
*/
constructor
(
jobLogPath
,
context
)
{
mkdir
(
jobLogPath
)
//
mkdir(jobLogPath)
this
.
jobLogPath
=
jobLogPath
this
.
context
=
context
this
.
runningJobs
=
new
Set
()
...
...
@@ -50,41 +55,69 @@ class JobManager {
return
this
.
runningJobs
.
has
(
jobId
)
}
/**
* @param {number} jobId
* @param {string} jobJsonParams
* @param {number} logId
* @param {number} logDateTime
* @param {number} executorTimeout
* @param {string} handlerName
* @param {function} jobHandler
* @param {function} callback
*/
runJob
(
jobId
,
jobJsonParams
,
logId
,
logDateTime
,
executorTimeout
,
handlerName
,
jobHandler
,
callback
)
{
let
timeout
=
undefined
const
logNameSpace
=
this
.
getJobLoggerNamespace
(
handlerName
,
logDateTime
,
logId
)
const
logFilePath
=
this
.
getLogFilePath
(
logDateTime
)
const
jobLogger
=
logger
(
logNameSpace
,
logFilePath
)
Task
.
of
(
jobJsonParams
)
.
chain
((
jobJsonParams
)
=>
Task
.
of
(
jobJsonParams
?
JSON
.
parse
(
jobJsonParams
)
:
{}))
.
chain
((
jobParams
)
=>
{
jobLogger
.
trace
(
'start'
)
// check duplicate job
if
(
this
.
hasJob
(
jobId
))
return
Task
.
rejected
(
'There is already have a same job is running'
)
this
.
runningJobs
.
add
(
jobId
)
// setup timeout
if
(
executorTimeout
)
{
timeout
=
setTimeout
(
async
()
=>
await
this
.
finishJob
({
jobId
,
logId
,
jobLogger
,
callback
,
timeout
,
error
:
new
Error
(
'timeout'
)
}),
executorTimeout
*
1000
)
}
return
Task
.
fromPromised
(
jobHandler
)(
jobLogger
,
jobParams
,
this
.
context
)
})
.
chain
((
result
)
=>
Task
.
of
({
result
}))
.
orElse
((
error
)
=>
Task
.
of
({
error
}))
.
chain
(
tapTask
(
async
({
result
,
error
})
=>
await
this
.
finishJob
({
jobId
,
logId
,
jobLogger
,
callback
,
timeout
,
result
,
error
})))
.
run
().
promise
()
async
runJob
(
jobId
,
// 任务ID
jobJsonParams
,
// 任务参数(JSON格式)
logId
,
// 日志ID
logDateTime
,
// 日志时间
executorTimeout
,
// 执行超时时间(秒)
handlerName
,
// 处理器名称
jobHandler
,
// 任务处理函数
callback
// 回调函数
)
{
// 1. 初始化日志记录器
// const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
// const logFilePath = this.getLogFilePath(logDateTime);
// const jobLogger = logger(loggerNamespace, logFilePath);
let
result
,
timeoutTimer
,
error
try
{
if
(
this
.
hasJob
(
jobId
))
{
throw
new
Error
(
'已有相同任务正在运行'
)
}
this
.
runningJobs
.
add
(
jobId
)
let
jobParams
try
{
jobParams
=
JSON
.
parse
(
jobJsonParams
);
}
catch
{
jobJsonParams
=
{}
}
if
(
executorTimeout
)
{
timeoutTimer
=
setTimeout
(
()
=>
this
.
finishJob
({
jobId
,
logId
,
callback
,
timeoutTimer
,
result
:
null
,
error
:
new
Error
(
'任务执行超时'
)
}),
executorTimeout
*
1000
);
}
result
=
await
jobHandler
(
jobParams
,
this
.
context
);
}
catch
(
error
)
{
error
=
error
}
await
this
.
finishJob
({
jobId
,
logId
,
callback
,
timeoutTimer
,
result
,
error
});
}
/**
...
...
@@ -108,19 +141,16 @@ class JobManager {
* @param {*} error
* @return {Promise<void>}
*/
async
finishJob
({
jobId
,
logId
,
jobLogger
,
callback
,
timeout
,
result
,
error
})
{
async
finishJob
({
jobId
,
logId
,
callback
,
timeoutTimer
,
result
,
error
})
{
try
{
timeout
&&
clearTimeout
(
timeout
)
result
&&
jobLogger
.
trace
(
'result: %o'
,
result
)
error
&&
jobLogger
.
err
(
'error: %o'
,
error
.
message
||
error
)
jobLogger
.
trace
(
'end'
)
jobLogger
.
close
()
timeoutTimer
&&
clearTimeout
(
timeoutTimer
)
await
callback
(
error
,
{
logId
,
result
})
}
catch
(
err
)
{
log
.
err
(
'finishJob error: %o'
,
err
.
message
||
err
)
console
.
log
(
`finishJob error:
${
err
.
message
}
`
)
}
finally
{
this
.
runningJobs
.
delete
(
jobId
)
}
this
.
runningJobs
.
delete
(
jobId
)
}
}
module
.
exports
=
JobManager
}
\ No newline at end of file
src/xxl_job/logger.ts
查看文件 @
ee207e6
import
debug
from
'debug'
;
import
fs
from
'fs'
;
import
os
from
'os'
;
import
util
from
'util'
;
//
import debug from 'debug';
//
import fs from 'fs';
//
import os from 'os';
//
import util from 'util';
// 类型定义
type
LogLevel
=
'error'
|
'info'
|
'warn'
|
'debug'
|
'trace'
;
//
//
类型定义
//
type LogLevel = 'error' | 'info' | 'warn' | 'debug' | 'trace';
type
LoggerMethods
=
{
[
key
in
LogLevel
]:
debug
.
Debugger
;
}
&
{
close
:
()
=>
void
;
};
//
type LoggerMethods = {
//
[key in LogLevel]: debug.Debugger;
//
} & {
//
close: () => void;
//
};
interface
LoggerOptions
{
namespace
:
string
;
logFilePath
?:
string
;
enabledLevels
?:
string
;
debugEnabled
?:
boolean
;
}
//
interface LoggerOptions {
//
namespace: string;
//
logFilePath?: string;
//
enabledLevels?: string;
//
debugEnabled?: boolean;
//
}
// 配置解析
const
parseEnvBool
=
(
envVar
:
string
):
boolean
=>
{
const
value
=
process
.
env
[
envVar
]
||
''
;
return
/^
(
yes|on|true|enable|enabled|1
)
$/i
.
test
(
value
);
};
//
//
配置解析
//
const parseEnvBool = (envVar: string): boolean => {
//
const value = process.env[envVar] || '';
//
return /^(yes|on|true|enable|enabled|1)$/i.test(value);
//
};
const
getEnvValue
=
(
envVar
:
string
,
defaultValue
:
string
):
string
=>
{
return
process
.
env
[
envVar
]
!==
undefined
?
process
.
env
[
envVar
]
:
defaultValue
;
};
//
const getEnvValue = (envVar: string, defaultValue: string): string => {
//
return process.env[envVar] !== undefined ? process.env[envVar] : defaultValue;
//
};
// 默认配置
const
DEFAULT_LOG_LEVELS
=
'info:*,warn:*,error:*,debug:*,trace:*'
;
const
WRITE_STREAM_OPTIONS
:
fs
.
WriteStreamOptions
=
{
flags
:
'a'
,
encoding
:
'utf8'
,
autoClose
:
true
,
emitClose
:
true
};
//
//
默认配置
//
const DEFAULT_LOG_LEVELS = 'info:*,warn:*,error:*,debug:*,trace:*';
//
const WRITE_STREAM_OPTIONS: fs.WriteStreamOptions = {
//
flags: 'a',
//
encoding: 'utf8',
//
autoClose: true,
//
emitClose: true
//
};
// 空日志函数
const
noop
=
(...
_args
:
any
[]):
void
=>
{};
const
noopLogger
:
LoggerMethods
=
{
error
:
noop
,
info
:
noop
,
warn
:
noop
,
debug
:
noop
,
trace
:
noop
,
close
:
noop
};
//
//
空日志函数
//
const noop = (..._args: any[]): void => {};
//
const noopLogger: LoggerMethods = {
//
error: noop,
//
info: noop,
//
warn: noop,
//
debug: noop,
//
trace: noop,
//
close: noop
//
};
// 创建基础日志函数
const
createBaseLoggers
=
(
namespace
:
string
):
Omit
<
LoggerMethods
,
'close'
>
=>
{
return
{
error
:
debug
(
'error'
).
extend
(
namespace
),
info
:
debug
(
'info'
).
extend
(
namespace
),
warn
:
debug
(
'warn'
).
extend
(
namespace
),
debug
:
debug
(
'debug'
).
extend
(
namespace
),
trace
:
debug
(
'trace'
).
extend
(
namespace
)
};
};
//
//
创建基础日志函数
//
const createBaseLoggers = (namespace: string): Omit<LoggerMethods, 'close'> => {
//
return {
//
error: debug('error').extend(namespace),
//
info: debug('info').extend(namespace),
//
warn: debug('warn').extend(namespace),
//
debug: debug('debug').extend(namespace),
//
trace: debug('trace').extend(namespace)
//
};
//
};
// 配置日志级别
const
configureLogLevels
=
(
logger
:
Omit
<
LoggerMethods
,
'close'
>
,
enabledLevels
:
string
):
void
=>
{
Object
.
entries
(
logger
).
forEach
(([
level
])
=>
{
logger
[
level
as
LogLevel
].
enabled
=
enabledLevels
.
includes
(
level
);
});
};
//
//
配置日志级别
//
const configureLogLevels = (
//
logger: Omit<LoggerMethods, 'close'>,
//
enabledLevels: string
//
): void => {
//
Object.entries(logger).forEach(([level]) => {
//
logger[level as LogLevel].enabled = enabledLevels.includes(level);
//
});
//
};
// 创建文件日志写入器
const
createFileLogger
=
(
logFilePath
:
string
,
logger
:
Omit
<
LoggerMethods
,
'close'
>
,
enabledLevels
:
string
):
LoggerMethods
=>
{
const
writeStream
=
fs
.
createWriteStream
(
logFilePath
,
WRITE_STREAM_OPTIONS
);
//
//
创建文件日志写入器
//
const createFileLogger = (
//
logFilePath: string,
//
logger: Omit<LoggerMethods, 'close'>,
//
enabledLevels: string
//
): LoggerMethods => {
//
const writeStream = fs.createWriteStream(logFilePath, WRITE_STREAM_OPTIONS);
const
logToFile
=
(...
args
:
any
[]):
void
=>
{
writeStream
.
write
(
`
${
util
.
format
(...
args
)}${
os
.
EOL
}
`
);
};
//
const logToFile = (...args: any[]): void => {
//
writeStream.write(`${util.format(...args)}${os.EOL}`);
//
};
const
logToConsoleAndFile
=
(...
args
:
any
[]):
void
=>
{
const
message
=
util
.
format
(...
args
);
console
.
error
(
message
);
writeStream
.
write
(
`
${
message
}${
os
.
EOL
}
`
);
};
//
const logToConsoleAndFile = (...args: any[]): void => {
//
const message = util.format(...args);
//
console.error(message);
//
writeStream.write(`${message}${os.EOL}`);
//
};
// 配置日志输出方式
Object
.
entries
(
logger
).
forEach
(([
level
,
logFn
])
=>
{
logFn
.
log
=
enabledLevels
.
includes
(
level
)
?
logToConsoleAndFile
:
logToFile
;
});
//
// 配置日志输出方式
//
Object.entries(logger).forEach(([level, logFn]) => {
//
logFn.log = enabledLevels.includes(level) ? logToConsoleAndFile : logToFile;
//
});
// 返回完整logger对象
return
{
...
logger
,
close
:
():
void
=>
{
writeStream
.
end
();
// 切换回仅控制台日志
Object
.
values
(
logger
).
forEach
((
logFn
)
=>
{
logFn
.
log
=
console
.
error
;
});
}
};
};
//
// 返回完整logger对象
//
return {
//
...logger,
//
close: (): void => {
//
writeStream.end();
//
// 切换回仅控制台日志
//
Object.values(logger).forEach((logFn) => {
//
logFn.log = console.error;
//
});
//
}
//
};
//
};
// 主导出函数
export
const
createLogger
=
(
options
:
LoggerOptions
|
string
):
LoggerMethods
=>
{
// 处理参数重载
const
normalizedOptions
=
typeof
options
===
'string'
?
{
namespace
:
options
}
:
options
;
//
//
主导出函数
//
export const createLogger = (
//
options: LoggerOptions | string
//
): LoggerMethods => {
//
// 处理参数重载
//
const normalizedOptions = typeof options === 'string'
//
? { namespace: options }
//
: options;
const
{
namespace
,
logFilePath
,
enabledLevels
=
getEnvValue
(
'DEBUG'
,
DEFAULT_LOG_LEVELS
),
debugEnabled
=
parseEnvBool
(
'XXL_JOB_DEBUG_LOG'
)
}
=
normalizedOptions
;
//
const {
//
namespace,
//
logFilePath,
//
enabledLevels = getEnvValue('DEBUG', DEFAULT_LOG_LEVELS),
//
debugEnabled = parseEnvBool('XXL_JOB_DEBUG_LOG')
//
} = normalizedOptions;
if
(
!
debugEnabled
&&
!
logFilePath
)
{
return
noopLogger
;
}
//
if (!debugEnabled && !logFilePath) {
//
return noopLogger;
//
}
const
logger
=
createBaseLoggers
(
namespace
);
//
const logger = createBaseLoggers(namespace);
// 配置日志级别
configureLogLevels
(
logger
,
enabledLevels
);
//
// 配置日志级别
//
configureLogLevels(logger, enabledLevels);
if
(
!
logFilePath
)
{
return
{
...
logger
,
close
:
noop
};
}
//
if (!logFilePath) {
//
return {
//
...logger,
//
close: noop
//
};
//
}
// 文件日志配置
return
createFileLogger
(
logFilePath
,
logger
,
enabledLevels
);
};
//
// 文件日志配置
//
return createFileLogger(logFilePath, logger, enabledLevels);
//
};
// 默认导出
export
default
createLogger
;
\ No newline at end of file
// // 默认导出
// export default createLogger;
\ No newline at end of file
编写
预览
支持
Markdown
格式
附加文件
你添加了
0
人
到此讨论。请谨慎行事。
Finish editing this message first!
Cancel
请
注册
或
登录
后发表评论