etcd实现服务发现和注册,使用的是kv存储、租约、watch.
向etcd 注册 该服务(其实就是 存一个值)然后向etcd 发送心跳,当etcd 没有检测到心跳就会 把这个键值对 删了(这整个动作是etcd里的租约模式),网关那边 就只需要 watch 这个 key ,就能够知道 所有服务的所有动态了. 注册的时候可以使用前缀这样在watch的时候可以watch所有的服务器.
package main import ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3" ) //创建租约注册服务 type ServiceRegister struct { etcdClient *clientv3.Client //etcd client lease clientv3.Lease //租约 leaseResp *clientv3.LeaseGrantResponse //设置租约时间返回 canclefunc func() //租约撤销 //租约keepalieve相应chan keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse key string //注册的key } func NewServiceRegister(addr []string, timeNum int64) (*ServiceReg, error) { conf := clientv3.Config{ Endpoints: addr, DialTimeout: 5 * time.Second, } var ( client *clientv3.Client ) //连接etcd if clientTem, err := clientv3.New(conf); err == nil { etcdClient = clientTem } else { return nil, err } ser := &ServiceRegister{ etcdClient: client, } //申请租约设置时间keepalive if err := ser.setLease(timeNum); err != nil { return nil, err } //监听续租相应chan go ser.ListenLeaseRespChan() return ser, nil } //设置租约 func (this *ServiceRegister) setLease(timeNum int64) error { //申请租约 lease := clientv3.NewLease(this.etcdClient) //设置租约时间 leaseResp, err := lease.Grant(context.TODO(), timeNum) if err != nil { return err } //设置续租 定期发送需求请求 ctx, cancelFunc := context.WithCancel(context.TODO()) leaseRespChan, err := lease.KeepAlive(ctx, leaseResp.ID) if err != nil { return err } this.lease = lease this.leaseResp = leaseResp this.canclefunc = cancelFunc this.keepAliveChan = leaseRespChan return nil } //监听 续租情况 func (this *ServiceRegister) ListenLeaseRespChan() { for { select { case leaseKeepResp := <-this.keepAliveChan: if leaseKeepResp == nil { fmt.Printf("已经关闭续租功能/n") return } else { fmt.Printf("续租成功/n") } } } } //通过租约 注册服务 func (this *ServiceRegister) PutService(key, val string) error { //带租约的模式写入数据即注册服务 kv := clientv3.NewKV(this.etcdClient) _, err := kv.Put(context.TODO(), key, val, clientv3.WithLease(this.leaseResp.ID)) return err } //撤销租约 func (this *ServiceRegister) RevokeLease() error { this.canclefunc() time.Sleep(2 * time.Second) _, err := this.lease.Revoke(context.TODO(), this.leaseResp.ID) return err } func main() { ser,_ := NewServiceRegister([]string{"127.0.0.1:2379"}, 5) ser.PutService("/server/node1", "node1") select{} }
import ( "go.etcd.io/etcd/clientv3" "time" "context" "go.etcd.io/etcd/mvcc/mvccpb" "sync" "log" ) type ServiceDiscovery struct { client *clientv3.Client serverList map[string]string lock sync.Mutex } func NewServiceDiscovery (addr []string)( *ServiceDiscovery, error){ conf := clientv3.Config{ Endpoints: addr, DialTimeout: 5 * time.Second, } if client, err := clientv3.New(conf); err == nil { return &ClientDis{ client:client, serverList:make(map[string]string), }, nil } else { return nil ,err } } func (this * ServiceDiscovery) GetService(prefix string) ([]string ,error){ //使用key前桌获取所有的etcd上所有的server resp, err := this.client.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return nil, err } //解析出所有的server放入本地 addrs := this.extractAddrs(resp) //warch server前缀 将变更写入本地 go this.watcher(prefix) return addrs ,nil } // 监听key前缀 func (this *ServiceDiscovery) watcher(prefix string) { //监听 返回监听事件chan rch := this.client.Watch(context.Background(), prefix, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: //修改或者新增 this.SetServiceList(string(ev.Kv.Key),string(ev.Kv.Value)) case mvccpb.DELETE: //删除 this.DelServiceList(string(ev.Kv.Key)) } } } } func (this *ServiceDiscovery) extractAddrs(resp *clientv3.GetResponse) []string { addrs := make([]string,0) if resp == nil || resp.Kvs == nil { return addrs } for i := range resp.Kvs { if v := resp.Kvs[i].Value; v != nil { this.SetServiceList(string(resp.Kvs[i].Key),string(resp.Kvs[i].Value)) addrs = append(addrs, string(v)) } } return addrs } func (this *ServiceDiscovery) SetServiceList(key,val string) { this.lock.Lock() defer this.lock.Unlock() this.serverList[key] = string(val) log.Println("set data key :",key,"val:",val) } func (this *ServiceDiscovery) DelServiceList(key string) { this.lock.Lock() defer this.lock.Unlock() delete(this.serverList,key) log.Println("del data key:", key) } func (this *ServiceDiscovery) SerList2Array()[]string { this.lock.Lock() defer this.lock.Unlock() addrs := make([]string,0) for _, v := range this.serverList { addrs = append(addrs,v) } return addrs } func main () { cli,_ := NewServiceDiscovery([]string{"127.0.0.1:2379"}) cli.GetService("/server") select {} }