nsqlookupd.go Main里主要是侦听两个服务tcp 服务和http 服务
代码片段 github.com\nsqio\nsq\nsqlookupd\nsqlookupd.go
func (l *NSQLookupd) Main() { ctx := &Context{l} //监听tcp tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) if err != nil { l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err) os.Exit(1) } l.Lock() l.tcpListener = tcpListener l.Unlock() tcpServer := &tcpServer{ctx: ctx} //创建一个TCP的服务端,接受客户端连接(里面是一个死循环,也就是不断的去监听), 启动一个goroutine去执行tcpServer的Handle l.waitGroup.Wrap(func() { protocol.TCPServer(tcpListener, tcpServer, l.logf) }) //监听tcp httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) if err != nil { l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err) os.Exit(1) } l.Lock() l.httpListener = httpListener l.Unlock() //设置路由 httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { //创建一个http的服务端 http_api.Serve(httpListener, httpServer, "HTTP", l.logf) }) }
文件里主要是启动了两个线程,tcp服务线程和http服务线程
TCP里执行的Handle
代码片段 github.com\nsqio\nsq\nsqlookupd\tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) //客户端在初始化自己的时候,需要发送4字节的数据用来标识它自己所有使用协议版本。将来升级协议的时候可以避免使用旧协议的客户端不能使用。 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.ctx.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err) return } //获取协议内容 protocolMagic := string(buf) p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V1": //当前只支持" V1"协议(前两有两个空格,所以总共是4字节),协议在nsqlookupd\lookup_protocol_v1.go文件中定义,这里创建了LookupProtocolV1的实例 //LookupProtocolV1实现了Protocol接口 prot = &LookupProtocolV1{ctx: p.ctx} default: //如果不是" V1"协议,则协议出错,断开链接,返回。 protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } //如果是" V1"协议,这里就进入了LookupProtocolV1的IOLoop方法。此方法里有for死循环运行,直到出现error时,才会执行下面的代码。 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) return } }
开始执行IOLoop
代码片段 D:\go\src\github.com\nsqio\nsq\nsqlookupd\lookup_protocol_v1.go
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { var err error var line string //这个方法文件github.com\nsqio\nsq\nsqlookupd\client_v1.go,这里使用到了go的方法的继承,go继承列子:http://studygolang.com/articles/5351 ,继承了net.Conn,这里有 Read(b []byte) (n int, err error)的接口,所以在下面bufio.NewReader使用的时候不会出错,NewReader 参数为io.Reader,其实也是Read(p []byte) (n int, err error)的一个接口 client := NewClientV1(conn) reader := bufio.NewReader(client) for { //ReadString读取直到第一次遇到delim字节,这里的delim为\n(换行符)那就是,每次读取一行 line, err = reader.ReadString('\n') if err != nil { break } //去掉前后两端的空格 line = strings.TrimSpace(line) //以空格进行分割,返回的是切片类似于php的explode函数 params := strings.Split(line, " ") var response []byte //执行LookupProtocolV1的Exec的方法,这个方法主要是根据每行数据的第一个参数,调用不同的方法 response, err = p.Exec(client, reader, params) //执行Exec有错误 if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) _, sendErr := protocol.SendResponse(client, []byte(err.Error())) if sendErr != nil { p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } //如果Exec返回有数据就执行SendResponse发送到客户端 if response != nil { _, err = protocol.SendResponse(client, response) if err != nil { break } } } conn.Close() p.ctx.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client) if client.peerInfo != nil { registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } } return err }