序
本文主要研究一下nacos-sdk-go的PushReceiver
PushReceiver
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
type PushReceiver struct { port int host string hostReactor *HostReactor }
- PushReceiver定义了port、host、hostReactor属性
NewPushRecevier
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func NewPushRecevier(hostReactor *HostReactor) *PushReceiver {
pr := PushReceiver{
hostReactor: hostReactor,
}
go pr.startServer()
return &pr
}
- NewPushRecevier方法创建PushReceiver,并异步执行pr.startServer()
startServer
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func (us *PushReceiver) startServer() { var conn *net.UDPConn for i := 0; i < 3; i++ { r := rand.New(rand.NewSource(time.Now().UnixNano())) port := r.Intn(1000) + 54951 us.port = port conn1, ok := us.tryListen() if ok { conn = conn1 log.Println("[INFO] udp server start, port: " + strconv.Itoa(port)) break } if !ok && i == 2 { log.Panicf("failed to start udp server after trying 3 times.") //os.Exit(1) //It is weird dangerous to invoke the os.Exit() as a Middleware. } } defer conn.Close() for { us.handleClient(conn) } }
- startServer方法随机执行一个端口,然后执行us.tryListen(),三次不成功则退出,成功则执行us.handleClient(conn)
tryListen
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func (us *PushReceiver) tryListen() (*net.UDPConn, bool) {
addr, err := net.ResolveUDPAddr("udp", us.host+":"+strconv.Itoa(us.port))
if err != nil {
log.Printf("[ERROR]: Can't resolve address,err: %s /n", err.Error())
return nil, false
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Printf("Error listening %s:%d,err:%s /n", us.host, us.port, err.Error())
return nil, false
}
return conn, true
}
- tryListen方法先执行net.ResolveUDPAddr,然后执行net.ListenUDP("udp", addr)
handleClient
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func (us *PushReceiver) handleClient(conn *net.UDPConn) { data := make([]byte, 4024) n, remoteAddr, err := conn.ReadFromUDP(data) if err != nil { log.Printf("[ERROR]:failed to read UDP msg because of %s /n", err.Error()) return } s := utils.TryDecompressData(data[:n]) log.Println("[INFO] receive push: "+s+" from: ", remoteAddr) var pushData PushData err1 := json.Unmarshal([]byte(s), &pushData) if err1 != nil { log.Printf("[ERROR] failed to process push data.err:%s /n", err1.Error()) return } ack := make(map[string]string) if pushData.PushType == "dom" || pushData.PushType == "service" { us.hostReactor.ProcessServiceJson(pushData.Data) ack["type"] = "push-ack" ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10) ack["data"] = "" } else if pushData.PushType == "dump" { ack["type"] = "dump-ack" ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10) ack["data"] = utils.ToJsonString(us.hostReactor.serviceInfoMap) } else { ack["type"] = "unknow-ack" ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10) ack["data"] = "" } bs, _ := json.Marshal(ack) conn.WriteToUDP(bs, remoteAddr) }
- handleClient方法通过conn.ReadFromUDP(data)接受数据,然后通过utils.TryDecompressData解压,再通过json.Unmarshal([]byte(s), &pushData)解析为PushData,之后根据pushData.PushType构造ack数据,最后通过conn.WriteToUDP(bs, remoteAddr)响应回去
小结
PushReceiver定义了port、host、hostReactor属性;它提供了NewPushRecevier、startServer、handleClient等方法
doc
- push_receiver
原文
https://segmentfault.com/a/1190000023050550
本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 聊聊nacos-sdk-go的PushReceiver