Commit 4b88d344 Harvey

任务多线程处理

1 个父辈 4976f269
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const express = require("express");
const config = require('../config/config.js');
console.log(config);
require('../config/config.js');
const config = global['config'];
if (config.enable_frpc) {
require('../frp');
}
......
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.job_handlers = void 0;
const job_handlers = new Map();
const path = require("path");
let job_handlers = new Map();
exports.job_handlers = job_handlers;
const fs = require('fs');
const path = require('path');
/**
* 递归加载指定目录下所有 .js 文件
* @param {string|string[]} dirPaths 要加载的目录路径(可以是字符串或数组)
* @param {object} [options] 配置选项
* @param {boolean} [options.ignoreNodeModules=true] 是否忽略 node_modules 目录
* @param {RegExp|function} [options.filter] 自定义过滤条件
* @returns {object} 包含所有加载模块的对象(以相对路径为键)
*/
function loadAllJSFiles(dirPaths, options = {}) {
const { ignoreNodeModules = true, filter = null } = options;
const loadedModules = {};
// 统一处理为数组形式
if (!Array.isArray(dirPaths)) {
dirPaths = [dirPaths];
if (global['config'].enable_piscina) {
const Piscina = require('piscina');
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js'),
maxThreads: require('os').cpus().length - 1,
minThreads: 1,
// maxQueue: 1000,
// idleTimeout: 60_000,
// resourceLimits: {
// maxOldGenerationSizeMb: 1024,
// maxYoungGenerationSizeMb: 256,
// stackSizeMb: 4,
// },
});
async function run_task(args) {
return piscina.run(args);
}
dirPaths.forEach(dirPath => {
// 解析为绝对路径
const absoluteDir = path.resolve(dirPath);
// 递归读取目录
function scanDirectory(currentDir, relativePath = '') {
const files = fs.readdirSync(currentDir);
files.forEach(file => {
const fullPath = path.join(currentDir, file);
const stat = fs.statSync(fullPath);
const newRelativePath = path.join(relativePath, file);
if (stat.isDirectory()) {
// 跳过 node_modules 目录(如果配置了忽略)
if (ignoreNodeModules && file === 'node_modules') {
return;
}
// 递归扫描子目录
scanDirectory(fullPath, newRelativePath);
}
else if (path.extname(file) === '.js' &&
(!filter ||
(typeof filter === 'function' && filter(fullPath)) ||
(filter instanceof RegExp && filter.test(fullPath)))) {
try {
// 加载模块并存储
const module = require(fullPath);
const moduleKey = newRelativePath.replace(/\.js$/, '');
loadedModules[moduleKey] = module;
}
catch (err) {
console.error(`加载模块失败: ${fullPath}`, err);
}
}
});
}
scanDirectory(absoluteDir);
run_task({ name: 'get_task_keys' }).then(function (data) {
data.forEach(function (name) {
job_handlers.set(name, (args) => run_task({ name, args }));
});
});
return loadedModules;
}
const modules = loadAllJSFiles(['./build/job'], {
ignoreNodeModules: true,
filter: (filePath) => !filePath.includes('build/job/index.js')
});
console.log(`=============开始加载任务=============`);
for (const key1 in modules) {
var m = modules[key1];
for (const key2 in m) {
if (typeof m[key2] === 'function') {
if (job_handlers.has(key2)) {
console.log(key2 + '已存在');
continue;
}
job_handlers.set(key2, m[key2]);
console.log(key2);
}
}
else {
exports.job_handlers = job_handlers = require('./worker')({ name: 'get_task_list' });
}
console.log(`=============任务加载完毕 共${job_handlers.size}个=============`);
//# sourceMappingURL=index.js.map
\ No newline at end of file
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const fs = require("fs");
const path = require('path');
const job_handlers = new Map();
/**
* 递归加载指定目录下所有 .js 文件
* @param {string|string[]} dirPaths 要加载的目录路径(可以是字符串或数组)
* @param {object} [options] 配置选项
* @param {boolean} [options.ignoreNodeModules=true] 是否忽略 node_modules 目录
* @param {RegExp|function} [options.filter] 自定义过滤条件
* @returns {object} 包含所有加载模块的对象(以相对路径为键)
*/
function loadAllJSFiles(dirPaths, options = {}) {
const { ignoreNodeModules = true, filter = null } = options;
const loadedModules = {};
// 统一处理为数组形式
if (!Array.isArray(dirPaths)) {
dirPaths = [dirPaths];
}
dirPaths.forEach(dirPath => {
// 解析为绝对路径
const absoluteDir = path.resolve(dirPath);
// 递归读取目录
function scanDirectory(currentDir, relativePath = '') {
const files = fs.readdirSync(currentDir);
files.forEach(file => {
const fullPath = path.join(currentDir, file);
const stat = fs.statSync(fullPath);
const newRelativePath = path.join(relativePath, file);
if (stat.isDirectory()) {
// 跳过 node_modules 目录(如果配置了忽略)
if (ignoreNodeModules && file === 'node_modules') {
return;
}
// 递归扫描子目录
scanDirectory(fullPath, newRelativePath);
}
else if (path.extname(file) === '.js' &&
(!filter ||
(typeof filter === 'function' && filter(fullPath)) ||
(filter instanceof RegExp && filter.test(fullPath)))) {
try {
// 加载模块并存储
const module = require(fullPath);
const moduleKey = newRelativePath.replace(/\.js$/, '');
loadedModules[moduleKey] = module;
}
catch (err) {
console.error(`加载模块失败: ${fullPath}`, err);
}
}
});
}
scanDirectory(absoluteDir);
});
return loadedModules;
}
(() => {
const modules = loadAllJSFiles(['./build/job'], {
ignoreNodeModules: true,
filter: (filePath) => !['build/job/index.js', 'build/job/worker.js'].find(item => filePath.includes(item))
});
console.log(`=============开始加载任务=============`);
for (const key1 in modules) {
var m = modules[key1];
for (const key2 in m) {
if (typeof m[key2] === 'function') {
if (job_handlers.has(key2)) {
console.log(key2 + '已存在');
continue;
}
job_handlers.set(key2, m[key2]);
// console.log(key2)
}
}
}
console.log(`=============任务加载完毕 共${job_handlers.size}个=============`);
})();
module.exports = ({ name, args }) => {
switch (name) {
case 'get_task_list':
return job_handlers;
case 'get_task_keys':
return Array.from(job_handlers.keys());
default:
break;
}
var func = job_handlers.get(name);
if (!func) {
throw new Error(`任务${name}不存在`);
}
return func(args);
};
//# sourceMappingURL=worker.js.map
\ No newline at end of file
//# sourceMappingURL=work.js.map
\ No newline at end of file
module.exports = ({ a, b }) => {
return a + b;
};
//# sourceMappingURL=worker.js.map
\ No newline at end of file
......@@ -25,7 +25,9 @@ try {
//是否启用服务器中转穿透
enable_frpc: true,
//是否启动多线程处理
enable_piscina: true,
};
}
module.exports = config;
global.config = config;
......@@ -14,7 +14,8 @@
"axios": "^1.9.0",
"body-parser": "^2.2.0",
"express": "^5.1.0",
"moment": "^2.30.1"
"moment": "^2.30.1",
"piscina": "^5.0.0"
},
"devDependencies": {
"@types/node": "^10.17.60"
......
import * as express from 'express'
const config = require('../config/config.js')
console.log(config)
require('../config/config.js')
const config = global['config'];
if(config.enable_frpc){
if( config.enable_frpc){
require('../frp')
}
......
const job_handlers = new Map();
const fs = require('fs');
const path = require('path');
/**
* 递归加载指定目录下所有 .js 文件
* @param {string|string[]} dirPaths 要加载的目录路径(可以是字符串或数组)
* @param {object} [options] 配置选项
* @param {boolean} [options.ignoreNodeModules=true] 是否忽略 node_modules 目录
* @param {RegExp|function} [options.filter] 自定义过滤条件
* @returns {object} 包含所有加载模块的对象(以相对路径为键)
*/
function loadAllJSFiles(dirPaths, options: any = {}) {
const {
ignoreNodeModules = true,
filter = null
}: any = options;
const loadedModules = {};
import * as path from 'path'
let job_handlers: any = new Map();
if (global['config'].enable_piscina) {
const Piscina = require('piscina')
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js'),
maxThreads: require('os').cpus().length - 1,
minThreads: 1,
// maxQueue: 1000,
// idleTimeout: 60_000,
// resourceLimits: {
// maxOldGenerationSizeMb: 1024,
// maxYoungGenerationSizeMb: 256,
// stackSizeMb: 4,
// },
});
// 统一处理为数组形式
if (!Array.isArray(dirPaths)) {
dirPaths = [dirPaths];
async function run_task(args) {
return piscina.run(args);
}
dirPaths.forEach(dirPath => {
// 解析为绝对路径
const absoluteDir = path.resolve(dirPath);
// 递归读取目录
function scanDirectory(currentDir, relativePath = '') {
const files = fs.readdirSync(currentDir);
files.forEach(file => {
const fullPath = path.join(currentDir, file);
const stat = fs.statSync(fullPath);
const newRelativePath = path.join(relativePath, file);
if (stat.isDirectory()) {
// 跳过 node_modules 目录(如果配置了忽略)
if (ignoreNodeModules && file === 'node_modules') {
return;
}
// 递归扫描子目录
scanDirectory(fullPath, newRelativePath);
} else if (
path.extname(file) === '.js' &&
(!filter ||
(typeof filter === 'function' && filter(fullPath)) ||
(filter instanceof RegExp && filter.test(fullPath)))
) {
try {
// 加载模块并存储
const module = require(fullPath);
const moduleKey = newRelativePath.replace(/\.js$/, '');
loadedModules[moduleKey] = module;
} catch (err) {
console.error(`加载模块失败: ${fullPath}`, err);
}
}
});
}
scanDirectory(absoluteDir);
run_task({ name: 'get_task_keys' }).then(function (data) {
data.forEach(function (name) {
job_handlers.set(name, (args) => run_task({ name, args }));
})
});
return loadedModules;
}
const modules = loadAllJSFiles(['./build/job'], {
ignoreNodeModules: true,
filter: (filePath) => !filePath.includes('build/job/index.js')
});
console.log(`=============开始加载任务=============`)
for (const key1 in modules) {
var m = modules[key1];
for (const key2 in m) {
if (typeof m[key2] === 'function') {
if (job_handlers.has(key2)) {
console.log(key2 + '已存在')
continue;
}
job_handlers.set(key2, m[key2])
console.log(key2)
}
}
} else {
job_handlers = require('./worker')({ name: 'get_task_list' })
}
console.log(`=============任务加载完毕 共${job_handlers.size}个=============`)
export { job_handlers }
export { job_handlers }
\ No newline at end of file
import * as fs from 'fs'
const path = require('path')
const job_handlers = new Map();
/**
* 递归加载指定目录下所有 .js 文件
* @param {string|string[]} dirPaths 要加载的目录路径(可以是字符串或数组)
* @param {object} [options] 配置选项
* @param {boolean} [options.ignoreNodeModules=true] 是否忽略 node_modules 目录
* @param {RegExp|function} [options.filter] 自定义过滤条件
* @returns {object} 包含所有加载模块的对象(以相对路径为键)
*/
function loadAllJSFiles(dirPaths, options: any = {}) {
const {
ignoreNodeModules = true,
filter = null
}: any = options;
const loadedModules = {};
// 统一处理为数组形式
if (!Array.isArray(dirPaths)) {
dirPaths = [dirPaths];
}
dirPaths.forEach(dirPath => {
// 解析为绝对路径
const absoluteDir = path.resolve(dirPath);
// 递归读取目录
function scanDirectory(currentDir, relativePath = '') {
const files = fs.readdirSync(currentDir);
files.forEach(file => {
const fullPath = path.join(currentDir, file);
const stat = fs.statSync(fullPath);
const newRelativePath = path.join(relativePath, file);
if (stat.isDirectory()) {
// 跳过 node_modules 目录(如果配置了忽略)
if (ignoreNodeModules && file === 'node_modules') {
return;
}
// 递归扫描子目录
scanDirectory(fullPath, newRelativePath);
} else if (
path.extname(file) === '.js' &&
(!filter ||
(typeof filter === 'function' && filter(fullPath)) ||
(filter instanceof RegExp && filter.test(fullPath)))
) {
try {
// 加载模块并存储
const module = require(fullPath);
const moduleKey = newRelativePath.replace(/\.js$/, '');
loadedModules[moduleKey] = module;
} catch (err) {
console.error(`加载模块失败: ${fullPath}`, err);
}
}
});
}
scanDirectory(absoluteDir);
});
return loadedModules;
}
(() => {
const modules = loadAllJSFiles(['./build/job'], {
ignoreNodeModules: true,
filter: (filePath) => !['build/job/index.js', 'build/job/worker.js'].find(item => filePath.includes(item))
});
console.log(`=============开始加载任务=============`)
for (const key1 in modules) {
var m = modules[key1];
for (const key2 in m) {
if (typeof m[key2] === 'function') {
if (job_handlers.has(key2)) {
console.log(key2 + '已存在')
continue;
}
job_handlers.set(key2, m[key2])
// console.log(key2)
}
}
}
console.log(`=============任务加载完毕 共${job_handlers.size}个=============`)
})()
module.exports = ({ name, args }) => {
switch (name) {
case 'get_task_list':
return job_handlers;
case 'get_task_keys':
return Array.from(job_handlers.keys());
default:
break;
}
var func = job_handlers.get(name);
if (!func) {
throw new Error(`任务${name}不存在`)
}
return func(args)
}
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!