job_manager.js
3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.JobManager = void 0;
const moment = require("moment");
const Path = require("path");
class JobManager {
context;
runningJobs;
jobLogPath;
constructor(jobLogPath, context) {
this.jobLogPath = jobLogPath;
this.context = context;
this.runningJobs = new Set();
}
getLogFilePath(dateTime) {
return Path.resolve(process.cwd(), `${this.jobLogPath}/${moment(dateTime, 'x').format('YYYY-MM-DD')}.log`);
}
getJobLoggerNamespace(handlerName, dateTime, logId) {
return `${handlerName}-${moment(dateTime, 'x').format('YYMMDD')}-${logId}-executing`;
}
hasJob(jobId) {
return this.runningJobs.has(jobId);
}
async runJob(jobId, jobJsonParams, logId, logDateTime, executorTimeout, handlerName, jobHandler, callback) {
let result, timeoutTimer, error;
let use_time = new Date().getTime();
try {
if (this.hasJob(jobId)) {
throw new Error('已有相同任务正在运行');
}
this.runningJobs.add(jobId);
let jobParams;
try {
jobParams = JSON.parse(jobJsonParams);
}
catch {
jobJsonParams = {};
}
if (executorTimeout) {
timeoutTimer = setTimeout(() => this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: new Date().getTime() - use_time,
result: null,
error: new Error('任务执行超时')
}), executorTimeout * 1000);
}
result = await jobHandler(jobParams, this.context);
if (result && result.err) {
error = result.err;
delete result.err;
if (Object.keys(result).length === 0) {
result = null;
}
}
}
catch (err) {
error = err;
}
await this.finishJob({
jobId,
logId,
callback,
timeoutTimer,
use_time: new Date().getTime() - use_time,
result,
error
});
}
async readJobLog(logDateTime, logId) {
const logFilePath = this.getLogFilePath(logDateTime);
const jobLogNamespace = this.getJobLoggerNamespace('', logDateTime, logId) + ' ';
return fs.existsSync(logFilePath) ? await searchInFile(logFilePath, jobLogNamespace, `${jobLogNamespace} end`) : [];
}
async finishJob({ jobId, logId, callback, timeoutTimer, use_time, result, error }) {
try {
timeoutTimer && clearTimeout(timeoutTimer);
await callback(error, { logId, use_time, result });
}
catch (err) {
console.log(`finishJob error: ${err.message}`);
}
finally {
this.runningJobs.delete(jobId);
}
}
}
exports.JobManager = JobManager;
//# sourceMappingURL=job_manager.js.map