job_manager.ts
4.0 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import * as moment from 'moment'
import * as Path from 'path'
// const Path = require('path')
// const logger = require('./logger')
// const log = logger('xxl-job-manager')
// const { Task, tapTask } = require('./purefuncs')
// const { mkdir, searchInFile } = require('./file')
/**
* 任务管理
*/
export class JobManager {
context
runningJobs
jobLogPath
/**
* @param {string} jobLogPath
* @param {*} context
*/
constructor(jobLogPath, context) {
// mkdir(jobLogPath)
this.jobLogPath = jobLogPath
this.context = context
this.runningJobs = new Set()
}
/**
* 根据调度时间获取日志文件路径
* @param {number} dateTime
* @return {string}
*/
getLogFilePath(dateTime) {
return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`)
}
/**
* 构造任务logger的namespace
* @param {string} handlerName
* @param {number} dateTime
* @param {number} logId
* @return {string}
*/
getJobLoggerNamespace(handlerName, dateTime, logId) {
return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`
}
/**
* @param {{number}} jobId
* @return {boolean}
*/
hasJob(jobId) {
return this.runningJobs.has(jobId)
}
async runJob(
jobId, // 任务ID
jobJsonParams, // 任务参数(JSON格式)
logId, // 日志ID
logDateTime, // 日志时间
executorTimeout, // 执行超时时间(秒)
handlerName, // 处理器名称
jobHandler, // 任务处理函数
callback // 回调函数
) {
// 1. 初始化日志记录器
// const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
// const logFilePath = this.getLogFilePath(logDateTime);
// const jobLogger = logger(loggerNamespace, logFilePath);
let result, timeoutTimer, error
let use_time = new Date().getTime()
try {
if (this.hasJob(jobId)) {
throw new Error('已有相同任务正在运行')
}
this.runningJobs.add(jobId)
let jobParams
try {
jobParams = JSON.parse(jobJsonParams);
} catch {
jobJsonParams = {}
}
if (executorTimeout) {
timeoutTimer = setTimeout(
() => this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: new Date().getTime() - use_time,
result: null,
error: new Error('任务执行超时')
}),
executorTimeout * 1000
);
}
result = await jobHandler(jobParams, this.context);
if (result && result.err) {
error = result.err
delete result.err
if (Object.keys(result).length === 0) {
result = null
}
}
} catch (err) {
error = err
}
await this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: new Date().getTime() - use_time,
result,
error
});
}
/**
* @param {number} logDateTime
* @param {number} logId
* @return {Promise<Array>}
*/
async readJobLog(logDateTime, logId) {
const logFilePath = this.getLogFilePath(logDateTime)
const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' '
return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : []
}
/**
* @param {number} jobId
* @param {number} logId
* @param {*} jobLogger
* @param {function} callback
* @param {number} timeoutTimer
* @param {number} use_time
* @param {*} result
* @param {*} error
* @return {Promise<void>}
*/
async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
try {
timeoutTimer && clearTimeout(timeoutTimer)
await callback(error, { logId, use_time, result })
} catch (err) {
console.log(`finishJob error: ${err.message}`)
} finally {
this.runningJobs.delete(jobId)
}
}
}