跳至正文

理解 Go 中 netpoll 的工作机制

一般我们聊到 netpoll 时,是指 Go runtime 中借助于epoll对套接字进行批量监听、数据到来时唤醒特定goroutine的机制。对应的代码存放在runtime/netpoll.go 和 runtime/netpoll_epoll.go (只考虑linux) 中。为此 runtime 提供了两大类函数:

第一类:调用方是 Go Runtime

  1. netpoll: 检查有事件发生的套接字,并返回处于pdReady状态的goroutine列表,基于epoll_wait;
  2. netpollBreak: 向 netpollBreakWr 写入一个字节数据,通过管道传到 netpollBreakRd,epoll_wait 监听到read pipe上的event,立即返回;

第二类:调用方是internal/poll、net、net/http等

  1. poll_runtime_pollServerInit(netpollGenericInit): 初始化poller,基于epoll_create1
  2. poll_runtime_pollOpen: 将套接字添加到监听列表,基于 epoll_ctl
  3. poll_runtime_pollWait: 等待套接字上的事件,可以休眠(gopark)当前goroutine, 借助于netpollblock函数
  4. poll_runtime_pollUnblock: 使用Unblock模式进行poll
  5. poll_runtime_pollClose: 将套接字从监听列表删除,基于 epoll_ctl
  6. poll_runtime_pollReset: nonblock模式下 prepareRead/prepareWrite 使用

这些函数都会被link到 internal/poll.runtime_xxx, xxx 可以是 runtime_pollServerInit/runtime_pollOpen等。

后面我们挑一些主要的函数来说一下。

netpollGenericInit 初始化 poller

netpollGenericInit 保证 poller 被初始化,原子变量netpollInited保证其仅被初始化一次。

func netpollGenericInit() {
  if atomic.Load(&netpollInited) == 0 {
    lockInit(&netpollInitLock, lockRankNetpollInit)
    lock(&netpollInitLock)
    if netpollInited == 0 {
      netpollinit()
      atomic.Store(&netpollInited, 1)
    }
    unlock(&netpollInitLock)
  }
}

这个函数只是一个壳,初始化逻辑封装在netpollinit函数中,依赖于平台具体的实现。linux下,init的逻辑是:

  1. 通过epoll_create1系统调用创建 epoll fd
  2. 创建一对 read/write pipe。pipe的一个特性是向 write pipe写入数据,read pipe 就能收到同样的数据
  3. 通过epoll_ctl将 write pipe 对应的fd 加入到监听列表

单独创建一对pipe后,runtime就能够按需中断epoll_wait,让netpoll函数立即返回。

func netpollinit() {
  epfd = epollcreate1(_EPOLL_CLOEXEC)
  if epfd < 0 {
    epfd = epollcreate(1024)
    if epfd < 0 {
      println("runtime: epollcreate failed with", -epfd)
      throw("runtime: netpollinit failed")
    }
    closeonexec(epfd)
  }
  r, w, errno := nonblockingPipe()
  if errno != 0 {
    println("runtime: pipe failed with", -errno)
    throw("runtime: pipe failed")
  }
  ev := epollevent{
    events: _EPOLLIN,
  }
  *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
  errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
  if errno != 0 {
    println("runtime: epollctl failed with", -errno)
    throw("runtime: epollctl failed")
  }
  netpollBreakRd = uintptr(r)
  netpollBreakWr = uintptr(w)
}

netpoll函数

netpoll函数的功能是检查可用的网络连接,它的工作流程是(happy path):

  1. 创建size=128的epollevent数组, 以接收事件
  2. 调用epollwait等待事件: 依赖epoll_wait系统调用
  3. 遍历epoll events,对于每个event
    • 创建一个pollDesc对象
    • 调用netpollready,找到对应的goroutine,并将其状态从pdWait修改为pdReady
  4. 返回pdReady状态的 goroutine列表 (gList)

struct pollDesc中包含两个信号量字段,可以表示四种状态:

  1. pdReady: io ready信号等待被接收,goroutine可以消费这个信号,逻辑上是把信号量改成nil
  2. pdWait: goroutine已经准备好在该信号量上阻塞,但还没有阻塞;
    • 如果goroutine通过gopark阻塞,状态会变成G pointer
    • 如果并发的io ready信号到达,状态会改成pdReady
    • 如果并发的timeout/close信号到达,状态会被改成nil
  3. G pointer: goroutine被阻塞在信号量上,可以被下面两类事件唤醒:
    • io ready信号到来时,状态被修改好pdReady
    • timeout/close信号到来时,状态被修改为nil
  4. nil: 不是上面三种状态

对应一些辅助函数:

  • netpollblock 函数将goroutine状态从 pdReady 转化成 pdWait,并gopark当前goroutine
  • netpollunblock 函数将goroutine状态从 pdWait 转换为 pdReady 或 nil

netpoll函数的代码在runtime/netpoll_epoll.go中,部分代码如下:

func netpoll(delay int64) gList {
  // epoll fd 为-1,说明不需要poll
  if epfd == -1 {
    return gList{}
  }
  var waitms int32
  // ...省略一段代码
  var events [128]epollevent
retry:
  n := epollwait(epfd, &events[0], int32(len(events)), waitms)
  if n < 0 {
    if n != -_EINTR {
      println("runtime: epollwait on fd", epfd, "failed with", -n)
      throw("runtime: netpoll failed")
    }
    // If a timed sleep was interrupted, just return to
    // recalculate how long we should sleep now.
    if waitms > 0 {
      return gList{}
    }
    goto retry
  }
  var toRun gList
  for i := int32(0); i < n; i++ {
    ev := &events[i]
    if ev.events == 0 {
      continue
    }

    if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
      // ... read pipe 有数据
      // 不需要唤醒任何goroutine
    }

    var mode int32
    if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
      mode += 'r'
    }
    if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
      mode += 'w'
    }
    if mode != 0 {
      pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
      pd.setEventErr(ev.events == _EPOLLERR)
      // 将goroutine置为 pdReady
      // 并添加到toRun *gList
      netpollready(&toRun, pd, mode)
    }
  }
  return toRun
}

备注: netpollready 函数借助于netpollunblock修改goroutine状态,并将其加到 io ready 的 goroutine list。

runtime在调用 netpoll 时,通常采用的是 nonblock 模式(delay=0), 只有在 findrunnable 的最后一个环节,会检查是否有单独的M(GMP中的M)进行net polling,如果没有,会block等待delay参数指定的时间。

netpollBreak 函数

netpollBreak函数的功能比较简单,但实现比较有意思。它和netpoll函数通过变量netpollWakeSig进行交互,由于在不同的goroutine中,所以对于该变量的操作都是原则操作。

// netpollBreak interrupts an epollwait.
func netpollBreak() {
  if atomic.Cas(&netpollWakeSig, 0, 1) {
    for {
      var b byte
      n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
      if n == 1 {
        break
      }
      if n == -_EINTR {
        continue
      }
      if n == -_EAGAIN {
        return
      }
      println("runtime: netpollBreak write failed with", -n)
      throw("runtime: netpollBreak write failed")
    }
  }
}

poll_runtime_pollOpen 函数

poll_runtime_pollOpen 的逻辑分为三块:

  1. 给 pollDesc 分配内存
  2. 初始化 pollDesc 对象
  3. 借助于 netpollopen 注册epoll监听(netpollopen在linux下是 epoll_ctl)
  4. 返回 pollDesc 对象

poll_runtime_pollOpen函数的实现位于 runtime/netpoll.go 中, 主要逻辑如下:

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  pd := pollcache.alloc()
  lock(&pd.lock)
  wg := pd.wg.Load()
  if wg != 0 && wg != pdReady {
    throw("runtime: blocked write on free polldesc")
  }
  rg := pd.rg.Load()
  if rg != 0 && rg != pdReady {
    throw("runtime: blocked read on free polldesc")
  }
  pd.fd = fd
  // ... 省略部分初始化逻辑
  unlock(&pd.lock)

  errno := netpollopen(fd, pd)
  if errno != 0 {
    pollcache.free(pd)
    return nil, int(errno)
  }
  return pd, 0
}

// 位于net/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
  var ev epollevent
  ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
  *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
  return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

poll_runtime_pollWait 函数

poll_runtime_pollWait 函数只是对 netpollblock 函数的封装,增加了容错。值得注意的是,该函数不是runtime触发的,而是用户程序触发的。

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
  errcode := netpollcheckerr(pd, int32(mode))
  if errcode != pollNoError {
    return errcode
  }
  // As for now only Solaris, illumos, and AIX use level-triggered IO.
  if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
    netpollarm(pd, mode)
  }
  for !netpollblock(pd, int32(mode), false) {
    errcode = netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
      return errcode
    }
    // Can happen if timeout has fired and unblocked us,
    // but before we had a chance to run, timeout has been reset.
    // Pretend it has not happened and retry.
  }
  return pollNoError
}

下面我们看下用户程序如何触发 poll_runtime_xxx 系列的函数。首先,套接字分为两类:LISTEN套接字(Server套接字) 和 ESTABLISHED套接字(TCPConn);

  • LISTEN 套接字通过系统调用 socket/bind/listen 去生成;
  • ESTABLISHED 套接字通过系统调用 accept 去生成;

LISTEN套接字(Server套接字)

从http server的角度来看,LISTEN套接字注册epoll监听的链路如下:

// net/http/server.go
func ListenAndServe(addr string, handler Handler) error

// net/http/server.go
func (srv *Server) ListenAndServe() error

// net/dial.go
func Listen(network, address string) (Listener, error) {
  var lc ListenConfig
  return lc.Listen(context.Background(), network, address)
}

// net/dial.go
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error)

// net/tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error)

// net/ipsock_posix.go
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) 

// net/sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error)

// net/sock_posix.go
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error)
  if err = fd.init(); err != nil {
    return err
  }

// net/fd_unix.go
func (fd *netFD) init() error {
  // fd.pfd 类型是 poll.FD
  return fd.pfd.Init(fd.net, true)
}

// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error {
  // We don't actually care about the various network types.
  if net == "file" {
    fd.isFile = true
  }
  if !pollable {
    fd.isBlocking = 1
    return nil
  }
  err := fd.pd.init(fd)
  if err != nil {
    // If we could not initialize the runtime poller,
    // assume we are using blocking mode.
    fd.isBlocking = 1
  }
  return err
}

// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
  serverInit.Do(runtime_pollServerInit)
  ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
  if errno != 0 {
    return errnoErr(syscall.Errno(errno))
  }
  pd.runtimeCtx = ctx
  return nil
}

ESTABLISHED套接字(TCPConn)

http server accept 新的tcp conn

// net/http/server.go
func (srv *Server) Serve(l net.Listener) error {
  for {
    rw, err := l.Accept()

// net/tcpsock.go
func (l *TCPListener) Accept() (Conn, error)

func (ln *TCPListener) accept() (*TCPConn, error) {
  fd, err := ln.fd.accept()

// net/fd_posix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
  d, rsa, errcall, err := fd.pfd.Accept()
  // 省略部分代码
  if err = netfd.init(); err != nil
  // 省略部分代码


// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error

// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error

关于 net.netFD struct

netFD是对套接字(网络文件描述符)的封装。对于Server套接字而言,可以通过accept方法从Server套接字(LISTEN套接字)获取新的TCP连接(或ESTABLISHED套接字)。Linux的accept系统调用返回的ESTABLISHED套接字是一个int值,通过 newFD 和 init 函数将其封装为一个完整的 netFD结构,后面会被封装为一个net.TCPConn。

对于操作系统而言,LISTEN套接字和ESTABLISHED套接字都只是一个int类型的文件描述符,没有本质区别。系统调用accept和read都是从套接字读取数据,所以epoll里会放到一个batch里去监听。

这是 netFD 的定义和accept方法的实现:

// Network file descriptor.
type netFD struct {
  pfd poll.FD

  // immutable until Close
  family      int
  sotype      int
  isConnected bool // handshake completed or use of association with peer
  net         string
  laddr       Addr
  raddr       Addr
}

func (fd *netFD) accept() (netfd *netFD, err error) {
  d, rsa, errcall, err := fd.pfd.Accept()
  if err != nil {
    if errcall != "" {
      err = wrapSyscallError(errcall, err)
    }
    return nil, err
  }

  if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
    poll.CloseFunc(d)
    return nil, err
  }
  if err = netfd.init(); err != nil {
    netfd.Close()
    return nil, err
  }
  lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
  netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
  return netfd, nil
}

net.netFD 依赖 poll.FD 实现poll功能。区别正如名字所展示,net.netFD是封装了网络相关的功能,而 poll.FD是更为通用的FD,封装了文件描述符上能进行的操作。其定义如下

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
  // Lock sysfd and serialize access to Read and Write methods.
  fdmu fdMutex

  // System file descriptor. Immutable until Close.
  Sysfd int

  // I/O poller.
  pd pollDesc

  // Writev cache.
  iovecs *[]syscall.Iovec

  // Semaphore signaled when file is closed.
  csema uint32

  // Non-zero if this file has been set to blocking mode.
  isBlocking uint32

  // Whether this is a streaming descriptor, as opposed to a
  // packet-based descriptor like a UDP socket. Immutable.
  IsStream bool

  // Whether a zero byte read indicates EOF. This is false for a
  // message based socket connection.
  ZeroReadIsEOF bool

  // Whether this is a file rather than a network socket.
  isFile bool
}

poll.FD 依赖 poll.pollDesc 实现poll功能。poll.pollDesc 实现了 IO polling 的功能。poll.pollDesc 有一系列的方法,比如 init、wait、close、prepare 等都是对 runtime_pollXXX 函数系列的封装,下面诗pollDesc的部分逻辑:

type pollDesc struct {
  runtimeCtx uintptr
}

var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
  serverInit.Do(runtime_pollServerInit)
  ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
  if errno != 0 {
    return errnoErr(syscall.Errno(errno))
  }
  pd.runtimeCtx = ctx
  return nil
}

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注