fast.ts
4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import _ = require('lodash')
import * as path from 'path'
import * as rp from 'request-promise'
import * as fastify from 'fastify'
import { requiredir } from '../common'
import { Log } from '../log'
import { config } from '../service';
let logger: Log
const api = {}
const fast = fastify({ trustProxy: true })
let targetPort = 0
export function init(cfg: any, callback?: Function) {
logger = new Log(`msbase_fastify`)
if (cfg.targetPort) targetPort = cfg.targetPort
const start = () => {
if (!cfg.port) return
try {
_.forEach(cfg.apipath ? cfg.apipath : ['./build/services/'], requiredir)
} catch (error) { logger.log(error.message); }
logger.log('api initialized');
// 路由 及 熔断
let circuitBreakerOpt = { threshold: 3, timeout: 3000, resetTimeout: 3000 }
if (cfg.circuitBreaker) circuitBreakerOpt = { ...circuitBreakerOpt, ...cfg.circuitBreaker }
fast.register(require('fastify-circuit-breaker'), circuitBreakerOpt)
// 压缩
fast.register(require('fastify-compress'), { global: false });
try {
const route = cfg.routefile ? require(path.resolve(cfg.routefile)) : null;
if (route) route(fast)
} catch (error) { logger.log(error.message); }
_.keys(api).length > 0 && fast.register(function (instance: any, opts, next) {
instance.route({
method: 'POST',
url: '/:cmd/',
// beforeHandler: instance.circuitBreaker(),
handler: async (request, reply) => {
let payload: any = {}
const start = Date.now()
const name = request.params.cmd.toLowerCase()
const bodyJson = JSON.stringify(request.body)
logger.debug(`1-api-${name} args-${bodyJson}`, { method_name: name })
try {
const action = api[name]
if (!action) throw new Error(`${name} does not exist in ${cfg.name}`)
payload.result = await action(request.body);
} catch (error) {
payload.error = error.message
}
const hs = Date.now() - start
let spend_time = Math.floor(hs / 50)
if (spend_time > 99) spend_time = 99
logger.debug(`2-api-${name} args-${bodyJson} result-${JSON.stringify(payload)} 耗时-${hs}`, { method_name: name, spend_time: spend_time })
// reply.send(payload)
reply.compress(payload)
}
})
console.log('start ms api route')
next()
});
fast.listen(cfg.port, '0.0.0.0', (err, address) => {
if (err) throw err
logger.info(`server listening on ${address}`)
})
}
callback ? callback(start) : start()
}
interface Pattern {
pubsub$?: boolean
timeout$?: number
maxMessages$?: number
expectedMessages$?: number
[key: string]: any
}
export function add(args: Pattern, callback: Function) {
api[args.cmd] = callback
console.log(`api '${args.cmd}' success`)
}
export async function act(args: Pattern, isThrowError = false) {
if (!args.topic || !args.cmd) throw new Error('topic or cmd is null')
if (args.topic) args.topic = args.topic.toLowerCase()
if (args.cmd) args.cmd = args.cmd.toLowerCase()
const args_obj = _.pick(args, ['data', 'context']);
const body = JSON.stringify(args_obj)
if (args.topic === config.name) return api[args.cmd]({ ...args_obj });
const url = `http://service-${args.topic}${targetPort ? (':' + targetPort) : ''}/${args.cmd}/`
const start = Date.now()
let ret: any = await rp.post(url, { body, gzip: true, forever: true, headers: { 'Content-Type': 'application/json' } })
.then(body => JSON.parse(body))
.catch(e => { return { error: e.message } })
const hs = Date.now() - start
let spend_time = Math.floor(hs / 50)
if (spend_time > 99) spend_time = 99
const msg = `act--${url}|${body}|${JSON.stringify(ret)}|${hs}`
const opts = { method_name: args.cmd, spend_time: spend_time }
logger.debug(msg, opts)
if (ret) {
if ('error' in ret) {
logger.error(msg, opts)
if (isThrowError) throw new Error(ret.error)
}
if ('result' in ret) {
ret = ret.result
}
}
return ret;
}