在微服务架构下,原单体服务被拆分为多个微服务独立部署,客户端就无法知晓服务的具体位置;而且服务数量太多,维护如此多的服务地址,运维人员也无法高效工作。
因此,在微服务架构中引入了服务注册中心,用于接受和维护各个服务的地址信息。客户端或者网关可以通过注册中心查询目标服务地址,动态实现服务访问,并且在此实现服务负载均衡。
对于服务注册与发现,go-kit默认提供了对consul、zookeeper、etcd、eureka常用注册中心的支持。
本文将基于consul,使用“客户端发现模式”进行实战演练,主要有以下要点:
本文实例程序采用的思路为:算术服务注册至consul,其他部分保持不变;发现服务对外暴露http接口,接受请求后(接收请求内容存储在Body中,以json方式传递),按照go-kit的机制动态查询算术服务实例,调用算术服务的接口,然后将响应内容返回。如下图所示:
docker/docker-compose.yml
,如下所示(暂时注释了Prometheus和Grafana的部分)。 version: '2' services: consul: image: progrium/consul:latest ports: - 8400:8400 - 8500:8500 - 8600:53/udp hostname: consulserver command: -server -bootstrap -ui-dir /ui 复制代码
sudo docker-compose -f docker/docker-compose.yml up 复制代码
http://localhost:8500
,出现以下界面即为启动成功。 本示例基于 arithmetic_monitor_demo
代码进行改写。首先,复制该目录并重命名为 arithmetic_consul_demo
;新建两个目录,分别命名为 register
、 discover
;将原有 go
代码文件移动至 register
目录。结果如下图所示:
另外,需要下载所依赖的第三方库 uuid
和 hashicorp/consul
go get github.com/pborman/uuid go get github.com/hashicorp/consul 复制代码
新建 register/register.go
,添加 Register
方法,实现向consul的注册逻辑。该方法接收5个参数,分别是注册中心consul的ip、端口,算术服务的本地ip和端口,日志记录工具。
创建注册对象需要使用 hashicorp/consul
,查看代码可知其方法定义如下:
func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar 复制代码
所以 Register
的实现过程主要有三步:创建consul客户端对象;创建consul对算术服务健康检查的参数配置信息;创建算术服务向consul注册的服务配置信息。代码如下:
func Register(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registar sd.Registrar) { // 创建Consul客户端连接 var client consul.Client { consulCfg := api.DefaultConfig() consulCfg.Address = consulHost + ":" + consulPort consulClient, err := api.NewClient(consulCfg) if err != nil { logger.Log("create consul client error:", err) os.Exit(1) } client = consul.NewClient(consulClient) } // 设置Consul对服务健康检查的参数 check := api.AgentServiceCheck{ HTTP: "http://" + svcHost + ":" + svcPort + "/health", Interval: "10s", Timeout: "1s", Notes: "Consul check service health status.", } port, _ := strconv.Atoi(svcPort) //设置微服务想Consul的注册信息 reg := api.AgentServiceRegistration{ ID: "arithmetic" + uuid.New(), Name: "arithmetic", Address: svcHost, Port: port, Tags: []string{"arithmetic", "raysonxin"}, Check: ✓, } // 执行注册 registar = consul.NewRegistrar(client, ®, logger) return } 复制代码
由 Step-2
可知,consul将定时请求算术服务的 /heath
用于检查服务的健康状态,所以我们将从 service
、 endpoint
、 transport
中增加对应的实现。
Service
中新增接口方法 HealthCheck
,并依次在 ArithmeticService
、 loggingMiddleware
、 metricMiddleware
中添加实现。 // service接口 // Service Define a service interface type Service interface { //省略之前的其他方法 // HealthCheck check service health status HealthCheck() bool } // ArithmeticService实现HealthCheck // HealthCheck implement Service method // 用于检查服务的健康状态,这里仅仅返回true。 func (s ArithmeticService) HealthCheck() bool { return true } // loggingMiddleware实现HealthCheck func (mw loggingMiddleware) HealthCheck() (result bool) { defer func(begin time.Time) { mw.logger.Log( "function", "HealthChcek", "result", result, "took", time.Since(begin), ) }(time.Now()) result = mw.Service.HealthCheck() return } // metricMiddleware实现HealthCheck func (mw metricMiddleware) HealthCheck() (result bool) { defer func(begin time.Time) { lvs := []string{"method", "HealthCheck"} mw.requestCount.With(lvs...).Add(1) mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds()) }(time.Now()) result = mw.Service.HealthCheck() return } 复制代码
endpoints.go
中新增结构: ArithmeticEndpoints
。在之前的示例中,仅使用了一个endpoint,所以我直接使用了结构 endpoint.Endpoint
。定义如下: // ArithmeticEndpoint define endpoint type ArithmeticEndpoints struct { ArithmeticEndpoint endpoint.Endpoint HealthCheckEndpoint endpoint.Endpoint } 复制代码
endpoint.Endpoint
封装方法。代码如下: // HealthRequest 健康检查请求结构 type HealthRequest struct{} // HealthResponse 健康检查响应结构 type HealthResponse struct { Status bool `json:"status"` } // MakeHealthCheckEndpoint 创建健康检查Endpoint func MakeHealthCheckEndpoint(svc Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { status := svc.HealthCheck() return HealthResponse{status}, nil } } 复制代码
transports.go
中新增健康检查接口 /health
。 // MakeHttpHandler make http handler use mux func MakeHttpHandler(ctx context.Context, endpoints ArithmeticEndpoints, logger log.Logger) http.Handler { r := mux.NewRouter() //省略原有/calculate/{type}/{a}/{b}代码 // create health check handler r.Methods("GET").Path("/health").Handler(kithttp.NewServer( endpoints.HealthCheckEndpoint, decodeHealthCheckRequest, encodeArithmeticResponse, options..., )) return r } 复制代码
接下来在 main.go
中增加健康检查和服务注册相关的调用代码,以便上述修改逻辑生效。
//创建健康检查的Endpoint,未增加限流 healthEndpoint := MakeHealthCheckEndpoint(svc) //把算术运算Endpoint和健康检查Endpoint封装至ArithmeticEndpoints endpts := ArithmeticEndpoints{ ArithmeticEndpoint: endpoint, HealthCheckEndpoint: healthEndpoint, } //创建http.Handler r := MakeHttpHandler(ctx, endpts, logger) 复制代码
// 定义环境变量 var ( consulHost = flag.String("consul.host", "", "consul ip address") consulPort = flag.String("consul.port", "", "consul port") serviceHost = flag.String("service.host", "", "service ip address") servicePort = flag.String("service.port", "", "service port") ) // parse flag.Parse() // ... //创建注册对象 registar := Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger) go func() { fmt.Println("Http Server start at port:" + *servicePort) //启动前执行注册 registar.Register() handler := r errChan <- http.ListenAndServe(":"+*servicePort, handler) }() go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) errChan <- fmt.Errorf("%s", <-c) }() error := <-errChan //服务退出,取消注册 registar.Deregister() fmt.Println(error) 复制代码
打开终端,切换至项目目录。执行 go build ./register
编译成功后,输入以下指令启动算术服务(注册服务):
./register -consul.host localhost -consul.port 8500 -service.host 192.168.192.145 -service.port 9000 复制代码
启动成功后,再次刷新 consul-ui
界面,看到如下界面即说明算术服务成功注册至consul。
同时也可以在注册服务运行的终端看到consul定时调用 /health
接口的日志输出信息:
discover
服务要完成的工作为:以REST接口 /calculate
对外提供API服务,客户端使用HTTP POST方法发送json数据执行请求;在endpoint中查询已经在consul中注册的服务实例;然后选择合适的服务实例向其发起请求转发;完成请求后向原客户端请求响应。
查阅go-kit源码可知, kit/sd/Endpointer
提供了一套服务发现机制,其定义和创建接口如下所示:
// Endpointer listens to a service discovery system and yields a set of // identical endpoints on demand. An error indicates a problem with connectivity // to the service discovery system, or within the system itself; an Endpointer // may yield no endpoints without error. type Endpointer interface { Endpoints() ([]endpoint.Endpoint, error) } // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src // and uses factory f to create Endpoints. If src notifies of an error, the Endpointer // keeps returning previously created Endpoints assuming they are still good, unless // this behavior is disabled via InvalidateOnError option. func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer 复制代码
通过代码注释我们可以知道: Endpointer通过监听服务发现系统的事件信息,并且通过factory按需创建服务终结点( Endpoint
)。
所以,我们需要通过 Endpointer
来实现服务发现功能。在微服务模式下,同一个服务可能存在多个实例,所以需要通过负载均衡机制完成实例选择,这里使用go-kit工具集中的 kit/sd/lb
组件(该组件实现RoundRibbon,并具备Retry功能)。
在 discover
目录中创建go文件 factory.go
,实现 sd.Factory
的逻辑,即把服务实例转换为endpoint,在该endpoint中实现对于目标服务的调用过程。这里直接针对算术运算服务进行封装,代码如下所示:
func arithmeticFactory(_ context.Context, method, path string) sd.Factory { return func(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) { if !strings.HasPrefix(instance, "http") { instance = "http://" + instance } tgt, err := url.Parse(instance) if err != nil { return nil, nil, err } tgt.Path = path var ( enc kithttp.EncodeRequestFunc dec kithttp.DecodeResponseFunc ) enc, dec = encodeArithmeticRequest, decodeArithmeticReponse return kithttp.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil } } func encodeArithmeticRequest(_ context.Context, req *http.Request, request interface{}) error { arithReq := request.(ArithmeticRequest) p := "/" + arithReq.RequestType + "/" + strconv.Itoa(arithReq.A) + "/" + strconv.Itoa(arithReq.B) req.URL.Path += p return nil } func decodeArithmeticReponse(_ context.Context, resp *http.Response) (interface{}, error) { var response ArithmeticResponse var s map[string]interface{} if respCode := resp.StatusCode; respCode >= 400 { if err := json.NewDecoder(resp.Body).Decode(&s); err != nil { return nil, err } return nil, errors.New(s["error"].(string) + "/n") } if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return nil, err } return response, nil } 复制代码
创建go文件 discover/enpoints.go
。根据上述分析,在该endpoint实现对服务发现系统的监听,实现实例选择,最终返回可执行的 endpoint.Endpoint
。下面根据代码注释说明实现过程:
// MakeDiscoverEndpoint 使用consul.Client创建服务发现Endpoint // 为了方便这里默认了一些参数 func MakeDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint { serviceName := "arithmetic" tags := []string{"arithmetic", "raysonxin"} passingOnly := true duration := 500 * time.Millisecond //基于consul客户端、服务名称、服务标签等信息, // 创建consul的连接实例, // 可实时查询服务实例的状态信息 instancer := consul.NewInstancer(client, logger, serviceName, tags, passingOnly) //针对calculate接口创建sd.Factory factory := arithmeticFactory(ctx, "POST", "calculate") //使用consul连接实例(发现服务系统)、factory创建sd.Factory endpointer := sd.NewEndpointer(instancer, factory, logger) //创建RoundRibbon负载均衡器 balancer := lb.NewRoundRobin(endpointer) //为负载均衡器增加重试功能,同时该对象为endpoint.Endpoint retry := lb.Retry(1, duration, balancer) return retry } 复制代码
创建go文件 discover/transports.go
。通过 mux/Router
使用POST方法为发现服务开放REST接口 /calculate
,与算术服务一样,这里需要 endpoint.Endpoint
、 DecodeRequestFunc
、 EncodeResponseFunc
。为了方便,我把算术服务中的请求与响应结构和编解码方法直接复制过来。代码如下所示:
func MakeHttpHandler(endpoint endpoint.Endpoint) http.Handler { r := mux.NewRouter() r.Methods("POST").Path("/calculate").Handler(kithttp.NewServer( endpoint, decodeDiscoverRequest, encodeDiscoverResponse, )) return r } // 省略实体结构和编解码方法 复制代码
接下来就是在main方法把以上逻辑串起来,然后启动发现服务了,这里监听端口为9001。比较简单,直接贴代码了:
func main() { // 创建环境变量 var ( consulHost = flag.String("consul.host", "", "consul server ip address") consulPort = flag.String("consul.port", "", "consul server port") ) flag.Parse() //创建日志组件 var logger log.Logger { logger = log.NewLogfmtLogger(os.Stderr) logger = log.With(logger, "ts", log.DefaultTimestampUTC) logger = log.With(logger, "caller", log.DefaultCaller) } //创建consul客户端对象 var client consul.Client { consulConfig := api.DefaultConfig() consulConfig.Address = "http://" + *consulHost + ":" + *consulPort consulClient, err := api.NewClient(consulConfig) if err != nil { logger.Log("err", err) os.Exit(1) } client = consul.NewClient(consulClient) } ctx := context.Background() //创建Endpoint discoverEndpoint := MakeDiscoverEndpoint(ctx, client, logger) //创建传输层 r := MakeHttpHandler(discoverEndpoint) errc := make(chan error) go func() { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) errc <- fmt.Errorf("%s", <-c) }() //开始监听 go func() { logger.Log("transport", "HTTP", "addr", "9001") errc <- http.ListenAndServe(":9001", r) }() // 开始运行,等待结束 logger.Log("exit", <-errc) } 复制代码
在终端中切换至 discover
目录,执行 go build
完成编译,然后使用以下命令(指定注册中心服务地址)启动发现服务:
./discover -consul.host localhost -consul.port 8500 复制代码
使用postman请求 http://localhost:9001/calculate
,在 body
中设置请求信息,完成测试。如下图所示:
本文使用consul作为注册中心,通过实例演示了go-kit的服务注册与发现功能。由于本人在这个部分了解不够透彻,在编写代码和本文的过程中,一直在研究go-kit发现组件的设计方式,力求能够通过代码、文字解释清楚。本人水平有限,有任何错误或不妥之处,请大家批评指正。
本文实例代码见 arithmetic_consul_demo 。
本文首发于本人微信公众号【兮一昂吧】,欢迎扫码关注!