job-manager.ts
4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/**
* 任务管理系统 - 用于管理并发任务,支持日志记录和超时控制
*/
export class JobManager {
private runningJobs; // 存储正在运行的任务ID集合
private jobLogPath; // 任务日志存储路径
/**
* 构造函数
* @param {string} jobLogPath - 任务日志存储路径
*/
constructor(jobLogPath) {
this.jobLogPath = jobLogPath;
this.runningJobs = new Set(); // 使用Set存储运行中任务ID,保证唯一性
}
/**
* 根据执行时间获取日志文件路径
* @param {number} dateTime - 任务执行时间戳
* @return {string} - 日志文件完整路径
*/
getLogFilePath(dateTime) {
// 待实现:根据时间戳生成日志文件路径
// 示例:按日期分目录存储
}
/**
* 生成任务日志器的命名空间
* @param {string} handlerName - 任务处理器名称
* @param {number} dateTime - 任务执行时间戳
* @param {number} logId - 日志唯一ID
* @return {string} - 日志器命名空间字符串
*/
getJobLoggerNamespace(handlerName, dateTime, logId) {
// 待实现:生成格式化的日志命名空间
// 示例:`job:${handlerName}:${dateTime}:${logId}`
}
/**
* 检查任务是否正在运行
* @param {number} jobId - 任务ID
* @return {boolean} - 是否存在该运行中任务
*/
hasJob(jobId) {
return this.runningJobs.has(jobId);
}
/**
* 执行任务
* @param {number} jobId - 任务ID
* @param {string} jobJsonParams - 任务参数(JSON字符串格式)
* @param {number} logId - 日志ID
* @param {number} logDateTime - 日志时间戳
* @param {number} executorTimeout - 执行超时时间(秒)
* @param {string} handlerName - 处理器名称
* @param {function} jobHandler - 任务处理函数
* @param {function} callback - 任务完成回调函数
*/
async runJob(
jobId,
jobJsonParams,
logId,
logDateTime,
executorTimeout,
handlerName,
jobHandler,
callback
) {
// 1. 初始化日志记录器(待实现)
// const loggerNamespace = this.getJobLoggerNamespace(handlerName, logDateTime, logId);
// const logFilePath = this.getLogFilePath(logDateTime);
// const jobLogger = logger(loggerNamespace, logFilePath);
let result, timeoutTimer, error;
const startTime = Date.now();
try {
// 检查是否有重复任务
if (this.hasJob(jobId)) {
throw new Error('已有相同任务正在运行');
}
this.runningJobs.add(jobId);
// 解析任务参数
let jobParams = {};
try {
jobParams = JSON.parse(jobJsonParams) || {};
} catch { }
// 设置超时定时器
if (executorTimeout) {
timeoutTimer = setTimeout(
() => this.finishJob({
jobId,
logId,
callback,
timeoutTimer: null,
use_time: Date.now() - startTime,
result: null,
error: new Error('任务执行超时')
}),
executorTimeout * 1000
);
}
// 执行任务处理函数
result = await jobHandler(jobParams);
// 处理可能存在的错误结果
if (result && result.error) {
error = result.error;
delete result.error;
if (Object.keys(result).length === 0) {
result = undefined;
}
}
} catch (err) {
error = err;
} finally {
await this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: Date.now() - startTime,
result,
error
});
}
}
/**
* 读取任务日志
* @param {number} logDateTime - 日志时间戳
* @param {number} logId - 日志ID
* @return {Promise<Array>} - 日志内容数组
*/
async readJobLog(logDateTime, logId) {
// 待实现:读取指定日志文件内容
}
/**
* 完成任务处理
* @param {Object} params - 任务完成参数对象
* @param {number} params.jobId - 任务ID
* @param {number} params.logId - 日志ID
* @param {function} params.callback - 回调函数
* @param {number} params.timeoutTimer - 超时定时器
* @param {number} params.use_time - 任务耗时(毫秒)
* @param {*} params.result - 任务结果
* @param {Error} params.error - 任务错误
*/
async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
try {
// 清除超时定时器
timeoutTimer && clearTimeout(timeoutTimer);
// 执行回调
await callback({ error, logId, use_time, result });
} catch (err) {
console.error(`完成任务时发生错误: ${err.message}`);
} finally {
// 无论成功与否,都从运行集合中移除
this.runningJobs.delete(jobId);
}
}
}