转载

Node集成到微服务体系

Node集成到微服务体系

最近想希望通过Node将目前由Java实现的例如邮件发送,短信发送,消息通知,数据计算结构到Node中

将Node整合为服务中台,这就碰到一个问题,如何将结构的服务集成到微服务的体系中

注册中心

微服务体系中核心节点,为服务拆分提供了有效的支撑

常见的有zookeeper(CP),eureka(AP),nacos(AP/CP)

CAP原则又称CAP定理,指的是在一个分布式系统中, 一致性 (Consistency)、 可用性 (Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾

Node环境下将服务向注册中心注册还是比较简单的,引入官方依赖生成实例即可

  • eureka eureka-js-client
/**
 * eureka注册模块
 */
'use script'
const Eureka = require('eureka-js-client').Eureka
module.exports = new Eureka({
  logger: global.LOGGER,
  instance: {
    app: global.APPLICATION_CONFIG.application.name, // 服务名称 app
    hostName: global.APPLICATION_CONFIG.application.host, // 请求地址 localhost
    instanceId: `${global.APPLICATION_CONFIG.application.host}:${global.RUNTIME.port}`, // localhost:9000
    ipAddr: global.APPLICATION_CONFIG.application.host, // IP地址
    statusPageUrl: `http://${global.APPLICATION_CONFIG.application.host}:${global.RUNTIME.port}/status`, // status
    port: { // 端口, 必须这个格式
      $: global.RUNTIME.port,
      '@enabled': 'true'
    },
    // 向eureka注册的服务名 feign调用时使用 app
    vipAddress: global.APPLICATION_CONFIG.application.name,
    // 本地搭建使用MyOwn且指定class
    dataCenterInfo: {
      '@class': 'com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo', // 缺少会导致404错误
      name: 'MyOwn' // 'Netflix' | 'Amazon' | 'MyOwn'
    }
  },
  eureka: {
    // 如果开启了auth 需要加入用户名和密码
    host: global.APPLICATION_CONFIG.eureka.host, // eureka地址,多个使用逗号分隔
    port: global.APPLICATION_CONFIG.eureka.port, // 端口号
    servicePath: global.APPLICATION_CONFIG.eureka.servicePath // 如果eureka没有更改过则是默认的 /eureka/apps
  }
})
复制代码
  • nacos nacos-sdk-nodejs
/**
 * nacos 注册服务
 * 向nacos注册,订阅配置推送
 * 通过events模块通知配置更新
 */
'use script'
const { NacosNamingClient, NacosConfigClient } = require('nacos')
const yaml = require('yamljs')
const logger = global.LOGGER
// 新建客户端
const client = new NacosNamingClient({
  logger,
  serverList: global.APPLICATION_CONFIG.nacos['service-list'],
  namespace: global.APPLICATION_CONFIG.nacos.namespace
})
client.ready()
// 向nocos实例注册
client.registerInstance(global.APPLICATION_CONFIG.application.name, {
  ip: global.APPLICATION_CONFIG.application.host,
  port: global.RUNTIME.port
}, global.APPLICATION_CONFIG.nacos.namespace)

// 新建配置实例,接收推送
const config = new NacosConfigClient({
  serverAddr: global.APPLICATION_CONFIG.nacos['service-list'],
  namespace: global.APPLICATION_CONFIG.nacos.namespace
})
config.subscribe({
  dataId: 'application-mercury-dev.yml',
  group: global.APPLICATION_CONFIG.nacos.namespace
}, content => {
  global.LOGGER.info('<= Publish Config Received')
  const remoteConfig = yaml.parse(content)
  global.LOGGER.info(`=>${JSON.stringify(remoteConfig)}`)
  // 覆盖当前配置
  Object.assign(global.APPLICATION_CONFIG, remoteConfig)
  // 推送
  global.EVENT_BUS.emit('resource-update')
})
exports.client = client
exports.config = config
复制代码

消息队列

消息队列当然也要整合,http调用肯定不能满足高并发,高负载的场景。MQ可以有效的提供缓冲与解构,贴一下RabbitMQ的实现

需要注意的事,接收到消息不一定只有单个内部模块使用,所以也需要考虑内部的订阅发布

/**
 * rabbit-mq 客户端封装
 */
'use script'
const amqp = require('amqplib')
module.exports = class RabbitMQ {
  /**
   * 构造器, 开启rabbit-mq的连接
   * @param host
   * @param port
   * @param user
   * @param pass
   */
  constructor (host = 'localhost', port = 5672, user = '***', pass = '***') {
    const self = this
    self.hosts = host
    self.consumer = []
    amqp.connect({
      hostname: host,
      port: port,
      username: user,
      password: pass
    }).then(conn => {
      global.LOGGER.info('<= RabbitMQ Connected')
      self.connect = conn
      self.connect.createChannel().then(channel => {
        global.LOGGER.info('<= Chanel Created')
        self.channel = channel
        self._consume()
      })
    }).catch(e => {
      global.LOGGER.error(`<= RabbitMQ Connect Error ${e}`)
    })
    self.subscriber = {}
  }

  /**
   * 添加消息队列监听
   * @private
   */
  _consume () {
    const distribute = (message) => {
      // 防止重复消费, 程序的错误将记录日志
      try {
        const content = message.content.toString()
        global.LOGGER.info(`<= Recive Queue [${message.fields.routingKey}] Message ${content}`)
        this.subscriber[message.fields.routingKey].forEach(callback => {
          callback(JSON.parse(content))
        })
      } catch (e) {
        global.LOGGER.error(e)
      } finally {
        // 接受消息并且确认
        this.channel.ack(message)
      }
    }
    this.consumer.forEach(consumer => {
      if (!this.subscriber[consumer.topic]) {
        this.channel.consume(consumer.topic, distribute)
        this.subscriber[consumer.topic] = []
      }
      this.subscriber[consumer.topic].push(consumer.callback)
    })
  }

  /**
   * 消费
   * @param topic
   * @param callback
   */
  subscribe (topic, callback) {
    this.consumer.push({ topic: topic, callback: callback })
  }

  /**
   * 推送
   * @param topic
   * @param message
   */
  publish (topic, message) {
    const stringMessage = JSON.stringify(message)
    this.channel.sendToQueue(topic, Buffer.from(stringMessage))
    global.LOGGER.info(`=> Publish Message ${stringMessage} `)
  }

  /**
   * 获取通信管道
   * @returns {any}
   */
  connection () {
    return this.channel
  }
}

复制代码

提供SDK

如果希望能够让其他服务或者系统 快乐

不报错

的调用,还是需要提供SDK的,提供SDK的好处有

  • 开箱即用,各种场景已经考虑到,并且依赖也已经整合

  • 完善的测试,为上面的有点提供支持

  • entity

public class Mail {
    /** 发件人 */
    private String from;
    /** 收件人 */
    private String to;
    /** 抄送 */
    private String cc;
    /** 秘抄 */
    private String bcc;
    /** 主题 */
    private String subject;
    /** 正文 */
    private String text;
    /** HTML正文 */
    private String html;
    public Mail() {
    }

    public Mail(String from, String to) {
        this.from = from;
        this.to = to;
    }

    public Mail(String from, String to, String subject) {
        this.from = from;
        this.to = to;
        this.subject = subject;
    }

    public Mail(String from, String to, String cc, String subject, String text) {
        this.from = from;
        this.to = to;
        this.cc = cc;
        this.subject = subject;
        this.text = text;
    }
    ...
}

复制代码
  • service
@FeignClient(name = "thunder-mercury")
public interface MercuryService extends IService {

    /**
     * 发送邮件
     *
     * @param mail 邮件实体
     * @return {@link BaseResponse}
     */
    @PostMapping(value = "/v1/mail/")
    BaseResponse<String> send(@RequestBody Mail mail);
    ...
}
复制代码

Node集成到微服务体系

原文  https://juejin.im/post/5efd638be51d45348424e06d
正文到此结束
Loading...