最近想希望通过Node将目前由Java实现的例如邮件发送,短信发送,消息通知,数据计算结构到Node中
将Node整合为服务中台,这就碰到一个问题,如何将结构的服务集成到微服务的体系中
微服务体系中核心节点,为服务拆分提供了有效的支撑
常见的有zookeeper(CP),eureka(AP),nacos(AP/CP)
CAP原则又称CAP定理,指的是在一个分布式系统中, 一致性 (Consistency)、 可用性 (Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾
Node环境下将服务向注册中心注册还是比较简单的,引入官方依赖生成实例即可
/**
* eureka注册模块
*/
'use script'
const Eureka = require('eureka-js-client').Eureka
module.exports = new Eureka({
logger: global.LOGGER,
instance: {
app: global.APPLICATION_CONFIG.application.name, // 服务名称 app
hostName: global.APPLICATION_CONFIG.application.host, // 请求地址 localhost
instanceId: `${global.APPLICATION_CONFIG.application.host}:${global.RUNTIME.port}`, // localhost:9000
ipAddr: global.APPLICATION_CONFIG.application.host, // IP地址
statusPageUrl: `http://${global.APPLICATION_CONFIG.application.host}:${global.RUNTIME.port}/status`, // status
port: { // 端口, 必须这个格式
$: global.RUNTIME.port,
'@enabled': 'true'
},
// 向eureka注册的服务名 feign调用时使用 app
vipAddress: global.APPLICATION_CONFIG.application.name,
// 本地搭建使用MyOwn且指定class
dataCenterInfo: {
'@class': 'com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo', // 缺少会导致404错误
name: 'MyOwn' // 'Netflix' | 'Amazon' | 'MyOwn'
}
},
eureka: {
// 如果开启了auth 需要加入用户名和密码
host: global.APPLICATION_CONFIG.eureka.host, // eureka地址,多个使用逗号分隔
port: global.APPLICATION_CONFIG.eureka.port, // 端口号
servicePath: global.APPLICATION_CONFIG.eureka.servicePath // 如果eureka没有更改过则是默认的 /eureka/apps
}
})
复制代码
/**
* nacos 注册服务
* 向nacos注册,订阅配置推送
* 通过events模块通知配置更新
*/
'use script'
const { NacosNamingClient, NacosConfigClient } = require('nacos')
const yaml = require('yamljs')
const logger = global.LOGGER
// 新建客户端
const client = new NacosNamingClient({
logger,
serverList: global.APPLICATION_CONFIG.nacos['service-list'],
namespace: global.APPLICATION_CONFIG.nacos.namespace
})
client.ready()
// 向nocos实例注册
client.registerInstance(global.APPLICATION_CONFIG.application.name, {
ip: global.APPLICATION_CONFIG.application.host,
port: global.RUNTIME.port
}, global.APPLICATION_CONFIG.nacos.namespace)
// 新建配置实例,接收推送
const config = new NacosConfigClient({
serverAddr: global.APPLICATION_CONFIG.nacos['service-list'],
namespace: global.APPLICATION_CONFIG.nacos.namespace
})
config.subscribe({
dataId: 'application-mercury-dev.yml',
group: global.APPLICATION_CONFIG.nacos.namespace
}, content => {
global.LOGGER.info('<= Publish Config Received')
const remoteConfig = yaml.parse(content)
global.LOGGER.info(`=>${JSON.stringify(remoteConfig)}`)
// 覆盖当前配置
Object.assign(global.APPLICATION_CONFIG, remoteConfig)
// 推送
global.EVENT_BUS.emit('resource-update')
})
exports.client = client
exports.config = config
复制代码
消息队列当然也要整合,http调用肯定不能满足高并发,高负载的场景。MQ可以有效的提供缓冲与解构,贴一下RabbitMQ的实现
需要注意的事,接收到消息不一定只有单个内部模块使用,所以也需要考虑内部的订阅发布
/**
* rabbit-mq 客户端封装
*/
'use script'
const amqp = require('amqplib')
module.exports = class RabbitMQ {
/**
* 构造器, 开启rabbit-mq的连接
* @param host
* @param port
* @param user
* @param pass
*/
constructor (host = 'localhost', port = 5672, user = '***', pass = '***') {
const self = this
self.hosts = host
self.consumer = []
amqp.connect({
hostname: host,
port: port,
username: user,
password: pass
}).then(conn => {
global.LOGGER.info('<= RabbitMQ Connected')
self.connect = conn
self.connect.createChannel().then(channel => {
global.LOGGER.info('<= Chanel Created')
self.channel = channel
self._consume()
})
}).catch(e => {
global.LOGGER.error(`<= RabbitMQ Connect Error ${e}`)
})
self.subscriber = {}
}
/**
* 添加消息队列监听
* @private
*/
_consume () {
const distribute = (message) => {
// 防止重复消费, 程序的错误将记录日志
try {
const content = message.content.toString()
global.LOGGER.info(`<= Recive Queue [${message.fields.routingKey}] Message ${content}`)
this.subscriber[message.fields.routingKey].forEach(callback => {
callback(JSON.parse(content))
})
} catch (e) {
global.LOGGER.error(e)
} finally {
// 接受消息并且确认
this.channel.ack(message)
}
}
this.consumer.forEach(consumer => {
if (!this.subscriber[consumer.topic]) {
this.channel.consume(consumer.topic, distribute)
this.subscriber[consumer.topic] = []
}
this.subscriber[consumer.topic].push(consumer.callback)
})
}
/**
* 消费
* @param topic
* @param callback
*/
subscribe (topic, callback) {
this.consumer.push({ topic: topic, callback: callback })
}
/**
* 推送
* @param topic
* @param message
*/
publish (topic, message) {
const stringMessage = JSON.stringify(message)
this.channel.sendToQueue(topic, Buffer.from(stringMessage))
global.LOGGER.info(`=> Publish Message ${stringMessage} `)
}
/**
* 获取通信管道
* @returns {any}
*/
connection () {
return this.channel
}
}
复制代码
如果希望能够让其他服务或者系统 快乐
的调用,还是需要提供SDK的,提供SDK的好处有
开箱即用,各种场景已经考虑到,并且依赖也已经整合
完善的测试,为上面的有点提供支持
entity
public class Mail {
/** 发件人 */
private String from;
/** 收件人 */
private String to;
/** 抄送 */
private String cc;
/** 秘抄 */
private String bcc;
/** 主题 */
private String subject;
/** 正文 */
private String text;
/** HTML正文 */
private String html;
public Mail() {
}
public Mail(String from, String to) {
this.from = from;
this.to = to;
}
public Mail(String from, String to, String subject) {
this.from = from;
this.to = to;
this.subject = subject;
}
public Mail(String from, String to, String cc, String subject, String text) {
this.from = from;
this.to = to;
this.cc = cc;
this.subject = subject;
this.text = text;
}
...
}
复制代码
@FeignClient(name = "thunder-mercury")
public interface MercuryService extends IService {
/**
* 发送邮件
*
* @param mail 邮件实体
* @return {@link BaseResponse}
*/
@PostMapping(value = "/v1/mail/")
BaseResponse<String> send(@RequestBody Mail mail);
...
}
复制代码