Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

Go language I / O multiplexing netpoller model

luozhiyun 2021-02-08 21:01:53 阅读数:19 评论数:0 点赞数:0 收藏数:0

Please state the source of reprint ~, This article was published at luozhiyun The blog of :https://www.luozhiyun.com

This article uses go Source code 15.7

It can be downloaded from Go Source directory structure and corresponding code file understanding Go Networks on different platforms I/O The realization of pattern . such as , stay Linux Based on epoll,freeBSD Based on kqueue, as well as Windows Based on iocp.

Because our code is deployed in Linux Upper , So this paper is based on epoll Encapsulate the implementation as an example Go In language I/O Multiplexing source code implementation .

Introduce

I/O Multiplexing

So-called I/O Multiplexing means select/epoll This series of multiplexers : Support single thread listening to multiple file descriptors at the same time (I/O event ), Block waiting , And receive notification when one of the file descriptors is readable . In case a lot of students are right select or epoll Not so familiar with , So let's talk about these two selectors first .

First, let's talk about file descriptors (File descriptor), According to its English initials, it is also called FD, It's an abstract concept for expressing references to files . It's an index value , The log table that points to the file opened by the kernel for each process . When the program opens an existing file or creates a new file , The kernel returns a file descriptor to the process .

select

int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout);

writefds、readfds、 and exceptfds It's a collection of three file descriptors .select It iterates through the top of each set nfds A descriptor , Respectively find the data that can be read 、 Can write 、 The descriptor where the error occurred , Collectively referred to as ready descriptors .

timeout The parameter represents the call select The blocking time of the system . If all file descriptors are not ready , Just block the calling process , Until a descriptor is ready , Or block more than set timeout after , return . If timeout The parameter is set to NULL, Will block indefinitely until a descriptor is ready ; If timeout The parameter is set to 0, Will return immediately , Don't block .

When select When the function returns , You can do this by traversing fdset, To find the ready descriptor .

multiplexing model

select I'd like to give you a list of the shortcomings :

  1. select The biggest drawback is that a single process opens FD There are certain restrictions , It consists of FD_SETSIZE Set up , The default value is 1024;
  2. Every time you call select, All need to put fd Sets are copied from user state to kernel state , The cost is in fd A lot of times it's big ;
  3. Every time kernel All need linear scanning of the whole fd_set, So with the monitor descriptor fd Increase in quantity , Its I/O The performance will decrease linearly ;

epoll

epoll yes selec Enhanced version of , Avoided “ High performance overhead ” and “ Few file descriptors ” Two shortcomings .

For the convenience of understanding the following content , Have a look first epoll Usage of :

int listenfd = socket(AF_INET, SOCK_STREAM, 0);
bind(listenfd, ...)
listen(listenfd, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); // All that needs to be monitored fd Add to epfd in
while(1){
int n = epoll_wait(...)
for( Receiving data socket){
// Handle
}
}

First use epoll_create Create a epoll Object instances epfd, Also returns a file descriptor that references the instance , The returned file descriptor only points to the corresponding epoll example , Does not represent a real disk file node .

epoll Instance internal storage :

  • Monitor list : All file descriptors to listen on , Use red black trees ;
  • Ready list : All ready file descriptors , Use the list ;

Re pass epoll_ctl Will need to be monitored fd Add to epfd in , Also for fd Set up a callback function , And monitor events event, And add it to the listening list . When something happens , Call the callback function , And will fd Add to epoll On the ready queue of the instance .

Last call epoll_wait Blocking monitor epoll All of the fd Of I/O event . When there is data in the ready list , that epoll_wait Go straight back to , It's solved select Questions that need to be polled every time .

epoll The advantages of :

epoll The monitor list is stored in red black tree ,epoll_ctl Function to add fd Will be placed in a node of the red black tree , And the insertion and deletion performance of the red black tree itself is relatively stable , Time complexity O(logN), And it can store a lot of fd, It can only be stored 1024 individual fd The limitation of ;

epoll_ctl The callback function is specified for each file descriptor in , And add it to the ready list when it's ready , So there's no need to look like select Just like traversing each file descriptor , Just judge whether the ready list is empty ;

analysis

netpoll In essence, it is right to I/O Encapsulation of multiplexing technology , So naturally, it's the same thing epoll The same can not be separated from the next few steps :

  1. netpoll Create and initialize ;
  2. towards netpoll Add tasks to be monitored ;
  3. from netpoll Gets the event triggered ;

stay go Chinese vs epoll The three functions provided are encapsulated :

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList

netpollinit Function is responsible for initialization netpoll;

netpollopen Responsible for listening to events on file descriptors ;

netpoll Will block waiting to return a set of ready Goroutine;

Here is Go Language in the preparation of a TCP server:

func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}
// Create a goroutine In charge of reading and writing tasks
go HandleConn(conn)
}
}

Now let's follow this TCP server Let's see where it is used netpoll To complete epoll Call to .

net.Listen

This TCP server Will call net.Listen Create a socket At the same time, return the corresponding fd, The fd Used to initialize listener Of netFD(go Layer encapsulated network file descriptor ), Then call netFD Of listenStream Method complete right socket Of bind&listen and netFD The initialization .

The procedure is as follows :

listen

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// Create a socket
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
...
// establish fd
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// call netFD Of listenStream Method complete right socket Of bind&listen and netFD The initialization
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
...
}
}
...
return fd, nil
}
func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}

sysSocket Method will initiate a system call to create a socket,newFD Will create a netFD, And then call netFD Of listenStream methods bind&listen operation , Also on netFD Conduct init.

netFD

netFD Is an encapsulation of a file descriptor ,netFD Contains a FD data structure ,FD It contains Sysfd and pollDesc Two important data structures ,Sysfd yes sysSocket Back to socket System file descriptor ,pollDesc Used to monitor the readability or writability of file descriptors .

Let's keep looking listenStream:

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
// Complete the bind operation
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// Monitor operation
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// initialization fd
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}

listenStream Method will call Bind Method to complete fd Binding operation of , And then call listenFunc monitor , Then call fd Of init Method , complete FD、pollDesc initialization .

func (pd *pollDesc) init(fd *FD) error {
// Call to runtime.poll_runtime_pollServerInit
serverInit.Do(runtime_pollServerInit)
// Call to runtime.poll_runtime_pollOpen
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
return nil
}

runtime_pollServerInit use Once Encapsulation guarantees can only be called once , This function is in the Linux One will be created on the platform epoll File descriptor instance ;

poll_runtime_pollOpen Called netpollopen Will fd Sign up to epoll In the example , And return a pollDesc;

netpollinit initialization

func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}

netpollGenericInit Will call the platform specific implementation netpollinit, stay Linux Will be called to netpoll_epoll.go Of netpollinit Method :

var (
epfd int32 = -1 // epoll descriptor
)
func netpollinit() {
// Create a new epoll File descriptor
epfd = epollcreate1(_EPOLL_CLOEXEC)
...
// Create a pipe for communication
r, w, errno := nonblockingPipe()
...
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// Add the file descriptor that reads the data to the monitor
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
...
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}

call epollcreate1 Method creates a epoll File descriptor instance , It should be noted that epfd Is a global property . Then create a pipeline for communication , call epollctl Add the file descriptor that reads the data to the monitor .

netpollopen Join event monitoring

Let's see next poll_runtime_pollOpen Method :

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

poll_runtime_pollOpen The method will pass pollcache.alloc The total initialization size is approximately 4KB Of pollDesc Structure . Then reset pd Properties of , call netpollopen towards epoll example epfd Add a new polling event to monitor the readability and writability of the file descriptor .

Let's take a look at pollCache How to initialize pollDesc Of .

type pollCache struct {
lock mutex
first *pollDesc
}
const pollBlockSize = 4 * 1024
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
// Initialize first node
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
// initialization pollDesc Linked list
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}

pollCache If the chain header of is empty , Then initialize the first node , The first node is a pollDesc The chain head of , Every time the struct is called, it will return the unused chain header pollDesc.

pollCache

It's done here net.Listen Analysis of , So let's see listen.Accept.

Listener.Accept

Listener.Accept Method will eventually call netFD Of accept In the method :

Accept

func (fd *netFD) accept() (netfd *netFD, err error) {
// call netfd.FD Of Accept Accept the new socket Connect , return socket Of fd
d, rsa, errcall, err := fd.pfd.Accept()
...
// Construct a new netfd
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// call netFD Of init Method to complete initialization
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

This method will first call FD Of Accept Accept the new socket Connect , And return a new socket Corresponding fd, And then call newFD Construct a new netfd, And pass init Method to complete initialization .

init We've seen the method above , Let's take a look at Accept Method :

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
...
for {
// Use linux system call accept Receive new connections , Create the corresponding socket
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
// If nothing is expected right now I/O event , that waitRead Will pass park goroutine Let logic block ad locum
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}

FD.Accept Method will use linux system call accept Receive new connections , Create the corresponding socket, If there is no readable message ,waitRead Will be blocked . These are park Resident goroutine Will be in goroutine Call in scheduling runtime.netpoll Awakened .

pollWait Event waiting

pollDesc.waitRead It's actually called runtime.poll_runtime_pollWait

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
// Get into netpollblock And judge whether there is expectation I/O events
for !netpollblock(pd, int32(mode), false) {
...
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// This for The cycle is to wait io ready perhaps io wait
for {
old := *gpp
// gpp == pdReady It means that at this time we have expected I/O events ,
// Can return directly unblock At present goroutine And execute the response I/O operation
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// If there is no expectation I/O events , By atomic manipulation gpp The value of is set to pdWait And exit for loop
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
if waitio || netpollcheckerr(pd, mode) == 0 {
// Yield current thread , take Goroutine Switch to sleep and wait for the runtime to wake up
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

poll_runtime_pollWait Will use for Cycle call netpollblock Function to determine whether there is something expected I/O events , until netpollblock return true Express io ready Only in this way can we get out of the cycle .

netpollblock Method will determine whether the current state is in pdReady, If so, go straight back to true; If not , It will be gpp adopt CAS Set to pdWait And exit for loop . adopt gopark Put the present goroutine to park live , Until the corresponding fd Read on / It can be written or anything I/O So far .

These are park Resident goroutine Will be in goroutine Call in scheduling runtime.netpoll Awakened .

netpoll Polling waiting

runtime.netpoll The core logic is : According to the reference delay Set call epoll_wait Of timeout value , call epoll_wait from epoll Of eventpoll.rdllist Two way list IO Ready fd list , Traverse epoll_wait Back to fd list , According to the call epoll_ctl register fd When encapsulating the context information assembly can run goroutine And back to .

After execution netpoll after , Will return a ready fd List corresponding goroutine list , Next we'll be ready goroutine Join the scheduling queue , Wait for the schedule to run .

func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
// Because of the introduction delay The unit is nanosecond , Let's turn nanoseconds into milliseconds
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
// Wait for the file descriptor to be readable or writable
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
// Returns a negative value , Then call again epollwait Wait for
if n < 0 {
...
goto retry
}
var toRun gList
// It means that there are pending events in the monitored file descriptor
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
...
// Determine the type of event that happened , Read type or write type
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// Take it out and store it in epollevent Inside pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// call netpollready, Incoming ready fd Of pollDesc
netpollready(&toRun, pd, mode)
}
}
return toRun
}

netpoll Would call epollwait Get ready fd list , Corresponding epoll The function is epoll_wait.toRun It's a g The linked list of , Store what you want to recover goroutines, Finally, it is returned to the caller . If epollwait Back to n Greater than zero , Then it indicates that there is an event to be processed in the monitored file descriptor , So you need to call for Cycle through processing . The loop will be set according to the time type mode, Then take out the corresponding pollDesc, call netpollready Method .

Let's take a look at netpollready:

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
// Get the corresponding g The pointer to
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// The corresponding g Add to toRun In the list
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
// Based on the incoming mode Judge event type
if mode == 'w' {
gpp = &pd.wg
}
for {
// Take out gpp Stored g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// cas Convert a read or write semaphore into pdReady
if atomic.Casuintptr(gpp, old, new) {
if old == pdWait {
old = 0
}
// Return the corresponding g The pointer
return (*g)(unsafe.Pointer(old))
}
}
}

Finished. runtime.netpoll There is a need to pay attention to the source code , call runtime.netpoll There are two places :

  • Execute in scheduler runtime.schedule(), In this method, the runtime.findrunable(), stay runtime.findrunable() Called in runtime.netpoll Get pending goroutine;
  • Go runtime A separate... Will be created when the program starts sysmon Monitor threads ,sysmon Every time 20us~10ms To run a , Each run checks the distance from the last execution netpoll Is it more than 10ms, If so, it will be called once runtime.netpoll;

Those who are interested in calling these portals can go and have a look by themselves .

summary

This paper starts from I/O Multiplexing begins select as well as epoll, And then back to go Language to see how it is to achieve multiplexing such a structure . By tracing the source code, we can find that , Actually go Also according to epoll To encapsulate your own functions :

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

Through these three functions to realize the function of epoll Create instance of 、 register 、 Event waiting operation .

about I/O Students who don't know much about multiplexing can also take this opportunity to learn more about network programming , Expand your knowledge .

Reference

https://www.infoq.cn/article/boeavgkiqmvcj8qjnbxk

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-netpoller/#66- Network poller

https://zhuanlan.zhihu.com/p/64138532

https://imageslr.github.io/2020/02/27/select-poll-epoll.html

http://singlecool.com/2020/12/13/golang-netpoll/

Copyright statement
In this paper,the author:[luozhiyun],Reprint please bring the original link, thank you

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;