转载

go-kit微服务:服务注册与发现

在微服务架构下,原单体服务被拆分为多个微服务独立部署,客户端就无法知晓服务的具体位置;而且服务数量太多,维护如此多的服务地址,运维人员也无法高效工作。

因此,在微服务架构中引入了服务注册中心,用于接受和维护各个服务的地址信息。客户端或者网关可以通过注册中心查询目标服务地址,动态实现服务访问,并且在此实现服务负载均衡。

对于服务注册与发现,go-kit默认提供了对consul、zookeeper、etcd、eureka常用注册中心的支持。

概述

本文将基于consul,使用“客户端发现模式”进行实战演练,主要有以下要点:

  • consul使用docker镜像(progrium/consul)单机部署;
  • 算术服务以名称arithmetic注册至consul;
  • 编写发现服务通过consul查询服务实例,基于RoundRibbon进行负载均衡。

本文实例程序采用的思路为:算术服务注册至consul,其他部分保持不变;发现服务对外暴露http接口,接受请求后(接收请求内容存储在Body中,以json方式传递),按照go-kit的机制动态查询算术服务实例,调用算术服务的接口,然后将响应内容返回。如下图所示:

go-kit微服务:服务注册与发现

启动consul

  1. 修改 docker/docker-compose.yml ,如下所示(暂时注释了Prometheus和Grafana的部分)。
version: '2'

services:
  consul:
    image: progrium/consul:latest
    ports:
      - 8400:8400
      - 8500:8500
      - 8600:53/udp
    hostname: consulserver
    command: -server -bootstrap -ui-dir /ui
复制代码
  1. 启动docker。在终端切换至项目目录,执行以下命令:
sudo docker-compose -f docker/docker-compose.yml up
复制代码
  1. 通过浏览器访问 http://localhost:8500 ,出现以下界面即为启动成功。
go-kit微服务:服务注册与发现

服务注册

Step-1:代码准备

本示例基于 arithmetic_monitor_demo 代码进行改写。首先,复制该目录并重命名为 arithmetic_consul_demo ;新建两个目录,分别命名为 registerdiscover ;将原有 go 代码文件移动至 register 目录。结果如下图所示:

go-kit微服务:服务注册与发现

另外,需要下载所依赖的第三方库 uuidhashicorp/consul

go get github.com/pborman/uuid
go get github.com/hashicorp/consul
复制代码

Step-2:实现注册方法

新建 register/register.go ,添加 Register 方法,实现向consul的注册逻辑。该方法接收5个参数,分别是注册中心consul的ip、端口,算术服务的本地ip和端口,日志记录工具。

创建注册对象需要使用 hashicorp/consul ,查看代码可知其方法定义如下:

func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar
复制代码

所以 Register 的实现过程主要有三步:创建consul客户端对象;创建consul对算术服务健康检查的参数配置信息;创建算术服务向consul注册的服务配置信息。代码如下:

func Register(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registar sd.Registrar) {

	// 创建Consul客户端连接
	var client consul.Client
	{
		consulCfg := api.DefaultConfig()
		consulCfg.Address = consulHost + ":" + consulPort
		consulClient, err := api.NewClient(consulCfg)
		if err != nil {
			logger.Log("create consul client error:", err)
			os.Exit(1)
		}

		client = consul.NewClient(consulClient)
	}

	// 设置Consul对服务健康检查的参数
	check := api.AgentServiceCheck{
		HTTP:     "http://" + svcHost + ":" + svcPort + "/health",
		Interval: "10s",
		Timeout:  "1s",
		Notes:    "Consul check service health status.",
	}

	port, _ := strconv.Atoi(svcPort)

	//设置微服务想Consul的注册信息
	reg := api.AgentServiceRegistration{
		ID:      "arithmetic" + uuid.New(),
		Name:    "arithmetic",
		Address: svcHost,
		Port:    port,
		Tags:    []string{"arithmetic", "raysonxin"},
		Check:   ✓,
	}

	// 执行注册
	registar = consul.NewRegistrar(client, ®, logger)
	return
}
复制代码

Step-3:实现健康检查接口

Step-2 可知,consul将定时请求算术服务的 /heath 用于检查服务的健康状态,所以我们将从 serviceendpointtransport 中增加对应的实现。

  1. 在接口 Service 中新增接口方法 HealthCheck ,并依次在 ArithmeticServiceloggingMiddlewaremetricMiddleware 中添加实现。
// service接口
// Service Define a service interface
type Service interface {

	//省略之前的其他方法

	// HealthCheck check service health status
	HealthCheck() bool
}

// ArithmeticService实现HealthCheck
// HealthCheck implement Service method
// 用于检查服务的健康状态,这里仅仅返回true。
func (s ArithmeticService) HealthCheck() bool {
	return true
}

// loggingMiddleware实现HealthCheck
func (mw loggingMiddleware) HealthCheck() (result bool) {
	defer func(begin time.Time) {
		mw.logger.Log(
			"function", "HealthChcek",
			"result", result,
			"took", time.Since(begin),
		)
	}(time.Now())
	result = mw.Service.HealthCheck()
	return
}

// metricMiddleware实现HealthCheck
func (mw metricMiddleware) HealthCheck() (result bool) {

	defer func(begin time.Time) {
		lvs := []string{"method", "HealthCheck"}
		mw.requestCount.With(lvs...).Add(1)
		mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
	}(time.Now())

	result = mw.Service.HealthCheck()
	return
}
复制代码
  1. endpoints.go 中新增结构: ArithmeticEndpoints 。在之前的示例中,仅使用了一个endpoint,所以我直接使用了结构 endpoint.Endpoint 。定义如下:
// ArithmeticEndpoint define endpoint
type ArithmeticEndpoints struct {
	ArithmeticEndpoint  endpoint.Endpoint
	HealthCheckEndpoint endpoint.Endpoint
}
复制代码
  1. 创建健康检查的请求、响应对象,以及对应的 endpoint.Endpoint 封装方法。代码如下:
// HealthRequest 健康检查请求结构
type HealthRequest struct{}

// HealthResponse 健康检查响应结构
type HealthResponse struct {
	Status bool `json:"status"`
}

// MakeHealthCheckEndpoint 创建健康检查Endpoint
func MakeHealthCheckEndpoint(svc Service) endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		status := svc.HealthCheck()
		return HealthResponse{status}, nil
	}
}
复制代码
  1. transports.go 中新增健康检查接口 /health
// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoints ArithmeticEndpoints, logger log.Logger) http.Handler {
	r := mux.NewRouter()

	//省略原有/calculate/{type}/{a}/{b}代码

	// create health check handler
	r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
		endpoints.HealthCheckEndpoint,
		decodeHealthCheckRequest,
		encodeArithmeticResponse,
		options...,
	))

	return r
}
复制代码

Step-4:修改main.go

接下来在 main.go 中增加健康检查和服务注册相关的调用代码,以便上述修改逻辑生效。

  1. 健康检查。
//创建健康检查的Endpoint,未增加限流
healthEndpoint := MakeHealthCheckEndpoint(svc)

//把算术运算Endpoint和健康检查Endpoint封装至ArithmeticEndpoints
endpts := ArithmeticEndpoints{
	ArithmeticEndpoint:  endpoint,
	HealthCheckEndpoint: healthEndpoint,
}

//创建http.Handler
r := MakeHttpHandler(ctx, endpts, logger)
复制代码
  1. 服务注册。准备服务需要的环境变量,创建注册对象,在服务启动前注册至consul,服务退出后从consul取消注册。下面只贴出部分代码,完整代码可从github获取。
// 定义环境变量
var (
	consulHost  = flag.String("consul.host", "", "consul ip address")
	consulPort  = flag.String("consul.port", "", "consul port")
	serviceHost = flag.String("service.host", "", "service ip address")
	servicePort = flag.String("service.port", "", "service port")
)
// parse
flag.Parse()

// ...

//创建注册对象
registar := Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger)

go func() {
	fmt.Println("Http Server start at port:" + *servicePort)
    //启动前执行注册
	registar.Register()
	handler := r
	errChan <- http.ListenAndServe(":"+*servicePort, handler)
}()

go func() {
	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	errChan <- fmt.Errorf("%s", <-c)
}()

error := <-errChan
//服务退出,取消注册
registar.Deregister()
fmt.Println(error)
复制代码

Step-5:编译&运行

打开终端,切换至项目目录。执行 go build ./register 编译成功后,输入以下指令启动算术服务(注册服务):

./register -consul.host localhost -consul.port 8500 -service.host 192.168.192.145 -service.port 9000
复制代码

启动成功后,再次刷新 consul-ui 界面,看到如下界面即说明算术服务成功注册至consul。

go-kit微服务:服务注册与发现

同时也可以在注册服务运行的终端看到consul定时调用 /health 接口的日志输出信息:

go-kit微服务:服务注册与发现

服务发现

discover 服务要完成的工作为:以REST接口 /calculate 对外提供API服务,客户端使用HTTP POST方法发送json数据执行请求;在endpoint中查询已经在consul中注册的服务实例;然后选择合适的服务实例向其发起请求转发;完成请求后向原客户端请求响应。

查阅go-kit源码可知, kit/sd/Endpointer 提供了一套服务发现机制,其定义和创建接口如下所示:

// Endpointer listens to a service discovery system and yields a set of
// identical endpoints on demand. An error indicates a problem with connectivity
// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
	Endpoints() ([]endpoint.Endpoint, error)
}

// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled via InvalidateOnError option.
func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer
复制代码

通过代码注释我们可以知道: Endpointer通过监听服务发现系统的事件信息,并且通过factory按需创建服务终结点( Endpoint )。

所以,我们需要通过 Endpointer 来实现服务发现功能。在微服务模式下,同一个服务可能存在多个实例,所以需要通过负载均衡机制完成实例选择,这里使用go-kit工具集中的 kit/sd/lb 组件(该组件实现RoundRibbon,并具备Retry功能)。

Step-1:创建factory

discover 目录中创建go文件 factory.go ,实现 sd.Factory 的逻辑,即把服务实例转换为endpoint,在该endpoint中实现对于目标服务的调用过程。这里直接针对算术运算服务进行封装,代码如下所示:

func arithmeticFactory(_ context.Context, method, path string) sd.Factory {
	return func(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
		if !strings.HasPrefix(instance, "http") {
			instance = "http://" + instance
		}

		tgt, err := url.Parse(instance)
		if err != nil {
			return nil, nil, err
		}
		tgt.Path = path

		var (
			enc kithttp.EncodeRequestFunc
			dec kithttp.DecodeResponseFunc
		)
		enc, dec = encodeArithmeticRequest, decodeArithmeticReponse

		return kithttp.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil
	}
}

func encodeArithmeticRequest(_ context.Context, req *http.Request, request interface{}) error {
	arithReq := request.(ArithmeticRequest)
	p := "/" + arithReq.RequestType + "/" + strconv.Itoa(arithReq.A) + "/" + strconv.Itoa(arithReq.B)
	req.URL.Path += p
	return nil
}

func decodeArithmeticReponse(_ context.Context, resp *http.Response) (interface{}, error) {
	var response ArithmeticResponse
	var s map[string]interface{}

	if respCode := resp.StatusCode; respCode >= 400 {
		if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
			return nil, err
		}
		return nil, errors.New(s["error"].(string) + "/n")
	}

	if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
		return nil, err
	}
	return response, nil
}
复制代码

Step-2:创建endpoint

创建go文件 discover/enpoints.go 。根据上述分析,在该endpoint实现对服务发现系统的监听,实现实例选择,最终返回可执行的 endpoint.Endpoint 。下面根据代码注释说明实现过程:

// MakeDiscoverEndpoint 使用consul.Client创建服务发现Endpoint
// 为了方便这里默认了一些参数
func MakeDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint {
	serviceName := "arithmetic"
	tags := []string{"arithmetic", "raysonxin"}
	passingOnly := true
	duration := 500 * time.Millisecond

	//基于consul客户端、服务名称、服务标签等信息,
	// 创建consul的连接实例,
	// 可实时查询服务实例的状态信息
	instancer := consul.NewInstancer(client, logger, serviceName, tags, passingOnly)

	//针对calculate接口创建sd.Factory
	factory := arithmeticFactory(ctx, "POST", "calculate")

	//使用consul连接实例(发现服务系统)、factory创建sd.Factory
	endpointer := sd.NewEndpointer(instancer, factory, logger)

	//创建RoundRibbon负载均衡器
	balancer := lb.NewRoundRobin(endpointer)

	//为负载均衡器增加重试功能,同时该对象为endpoint.Endpoint
	retry := lb.Retry(1, duration, balancer)

	return retry
}
复制代码

Step-3:创建transport

创建go文件 discover/transports.go 。通过 mux/Router 使用POST方法为发现服务开放REST接口 /calculate ,与算术服务一样,这里需要 endpoint.EndpointDecodeRequestFuncEncodeResponseFunc 。为了方便,我把算术服务中的请求与响应结构和编解码方法直接复制过来。代码如下所示:

func MakeHttpHandler(endpoint endpoint.Endpoint) http.Handler {
	r := mux.NewRouter()

	r.Methods("POST").Path("/calculate").Handler(kithttp.NewServer(
		endpoint,
		decodeDiscoverRequest,
		encodeDiscoverResponse,
	))

	return r
}

// 省略实体结构和编解码方法
复制代码

Step-4:编写main方法

接下来就是在main方法把以上逻辑串起来,然后启动发现服务了,这里监听端口为9001。比较简单,直接贴代码了:

func main() {

	// 创建环境变量
	var (
		consulHost = flag.String("consul.host", "", "consul server ip address")
		consulPort = flag.String("consul.port", "", "consul server port")
	)
	flag.Parse()

	//创建日志组件
	var logger log.Logger
	{
		logger = log.NewLogfmtLogger(os.Stderr)
		logger = log.With(logger, "ts", log.DefaultTimestampUTC)
		logger = log.With(logger, "caller", log.DefaultCaller)
	}

	//创建consul客户端对象
	var client consul.Client
	{
		consulConfig := api.DefaultConfig()

		consulConfig.Address = "http://" + *consulHost + ":" + *consulPort
		consulClient, err := api.NewClient(consulConfig)

		if err != nil {
			logger.Log("err", err)
			os.Exit(1)
		}
		client = consul.NewClient(consulClient)
	}

	ctx := context.Background()

	//创建Endpoint
	discoverEndpoint := MakeDiscoverEndpoint(ctx, client, logger)

	//创建传输层
	r := MakeHttpHandler(discoverEndpoint)

	errc := make(chan error)
	go func() {
		c := make(chan os.Signal)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		errc <- fmt.Errorf("%s", <-c)
	}()

	//开始监听
	go func() {
		logger.Log("transport", "HTTP", "addr", "9001")
		errc <- http.ListenAndServe(":9001", r)
	}()

	// 开始运行,等待结束
	logger.Log("exit", <-errc)
}
复制代码

Step-5:编译&运行

在终端中切换至 discover 目录,执行 go build 完成编译,然后使用以下命令(指定注册中心服务地址)启动发现服务:

./discover -consul.host localhost -consul.port 8500
复制代码

请求测试

使用postman请求 http://localhost:9001/calculate ,在 body 中设置请求信息,完成测试。如下图所示:

go-kit微服务:服务注册与发现

总结

本文使用consul作为注册中心,通过实例演示了go-kit的服务注册与发现功能。由于本人在这个部分了解不够透彻,在编写代码和本文的过程中,一直在研究go-kit发现组件的设计方式,力求能够通过代码、文字解释清楚。本人水平有限,有任何错误或不妥之处,请大家批评指正。

本文实例代码见 arithmetic_consul_demo 。

本文首发于本人微信公众号【兮一昂吧】,欢迎扫码关注!

go-kit微服务:服务注册与发现
原文  https://juejin.im/post/5c740a335188257c1e2c86a7
正文到此结束
Loading...