本文发自 http://www.binss.me/blog/commonly-used-server-programming-model/,转载请注明出处。

最近在阅读陈硕的《Linux多线程服务端编程:使用muduo C++网络库》,有感于书中“多线程服务器的常用编程模型”一节,通过查阅相关资料,对当前常用的服务端模型进行学习和尝试,并本文作为总结。

谈到服务端,脑海中往往会响起这样的话“单台机器并发多少?QPS多少?”。IO性能,俨然成为了服务端编程实现的重要关注点,针对这个问题,各路大牛纷纷出手,开发了各式各类的网络库和框架来解决这个问题。后人将之分门别类,归纳整理,总结出了服务端编程模型的概念。后人需要开发服务端时,只需按照模型实现即可,极大地降低了开发门槛。

本文使用Python3来实现各个模型下的EchoServer,作为示例。为了简略,省略了一些的import和常量,且不考虑一些异常的抛出,如ECONNRESET等。各个EchoServer接口统一,只需使用以下语句启动即可:

server = EchoServer(HOST, PORT)
server.run()

没有模型

洪荒年代,往往直接循环调用accept、recv、send。模式为同步阻塞式IO。

class EchoServer:
    def __init__(self, host, port):
        self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.listen_socket.bind((host, port))
        self.listen_socket.listen(128)

    def handle_echo(self, conn):
        while True:
            data = conn.recv(BUF_SIZE)
            if data:
                conn.sendall(data)
            else:
                conn.close()
                break

    def run(self):
        while True:
            conn, _ = self.listen_socket.accept()
            self.handle_echo(conn)

优点:逻辑最简单,最容易实现。

缺点:并发数只有1,即服务端同时只能够处理一个用户的请求,在处理期间(比如阻塞在recv接收数据时),其他用户无法连接到服务端。

多进程模型

主进程监听请求,对每个连接都使用一个子进程独立去处理。子进程的流程通常阻塞在数据的读写上。

一般有两种模式:

  • 动态创建进程 + 阻塞式同步IO

    对每个连接都创建一个进程去处理,处理完毕后进程被销毁。

    例如:

    class EchoServer:
        def __init__(self, host, port):
            self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
            self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.listen_socket.bind((host, port))
            self.listen_socket.listen(128)
    
        def handle_echo(self, conn):
            while True:
                data = conn.recv(BUF_SIZE)
                if data:
                    conn.sendall(data)
                else:
                    conn.close()
                    break
    
        def run(self):
            while True:
                conn, address = self.listen_socket.accept()
                ret = os.fork()
                if ret == 0:
                    self.handle_echo(conn)
                    return

    优点:逻辑简单,容易实现。每个进程都有自己独立的运行空间,即使core掉也不会互相影响。

  • 进程池 + 阻塞式同步IO

    每个请求都从进程池中选取一个进程去处理,处理完毕后进程池回收进程。固定的进程数目避免了频繁的构造和析构进程的开销。

    本来想通过multiprocessing.Pool实现,然而发现在使用时由于主进程处于阻塞状态,竟导致子进程无法执行。因此还是自己实现了进程池:

    import multiprocessing
    
    class EchoServer:
        def __init__(self, host, port):
            self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
            self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.listen_socket.bind((host, port))
            self.listen_socket.listen(128)
            self.processes = 3
            self.pool = multiprocessing.Queue(self.processes)
            for _ in range(self.processes):
                multiprocessing.Process(target=self.handle_echo).start()
    
        def handle_echo(self):
            while True:
                conn = self.pool.get()
                while True:
                    data = conn.recv(BUF_SIZE)
                    if data:
                        conn.sendall(data)
                    else:
                        conn.close()
                        break
    
        def run(self):
            while True:
                conn, address = self.listen_socket.accept()
                self.pool.put(conn)

缺点:创建新进程需要分配新的内存堆栈,进程切换带来较大的上下文切换开销。更严重的是,由于进程对内存的消耗大,面对C10K问题,往往无法创建上万个进程来应对上万的并发。

多线程模型

多线程模型与多进程模型实现差不多。

import _thread

class EchoServer:
    def __init__(self, host, port):
        self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.listen_socket.bind((host, port))
        self.listen_socket.listen(128)

    def handle_echo(self, conn):
        while True:
            data = conn.recv(BUF_SIZE)
            if data:
                conn.sendall(data)
            else:
                conn.close()
                break

    def run(self):
        while True:
            conn, address = self.listen_socket.accept()
            _thread.start_new_thread(self.handle_echo, (conn, ))

优点:创建、析构、切换开销和内存占用比进程小,线程间可以共享数据。

缺点:虽然开销比进程小,但是依然不容小视。由于线程间可以资源共享,对临界资源的必须采取相应的访问控制。此外在Python的主流实现CPython中,由于GIL(Global interpreter lock)的关系,同一时间只会有一个获得了GIL的线程在跑,使得程序无法利用物理多核的性能加速运算。

事件驱动模型

程序被动地等待事件的发生,当事件发生时,模型会调用事先注册好的相应事件处理函数(handler)。

在多进程、多线程模型中,操作在阻塞时(如recv、accept)仍以进程、线程的形式耗费着资源。而事件驱动模型只有在阻塞操作就绪事件发生时才进行相应资源的分配,自然大大地提高了能够处理的并发数。

事件驱动分为两种模式:

  • Reactor(非阻塞式同步IO)

    通过事件循环(event loop)和IO多路复用(select/poll/epoll/kqueue)实现。

    处理者(handler)向事件分离器注册一个读写操作,让其等待该操作的发生(比如文件描述符、socket可读写)。当操作发生时,事件分离器把事件传给事先注册的事件处理函数,由后者来进行实际的读写操作:

    class EchoServer:
        def __init__(self, host, port):
            listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
            listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            listen_socket.setblocking(0)
            listen_socket.bind((host, port))
            listen_socket.listen(128)
            self._fd_map = {}
            self._fd_map[listen_socket.fileno()] = (listen_socket, self.handle_conn)
            self._epoll = select.epoll()
            self._epoll.register(listen_socket.fileno(), EPOLLIN)
    
        def handle_echo(self, conn):
            data = conn.recv(BUF_SIZE)
            if data:
                conn.sendall(data)
            else:
                self._epoll.unregister(conn.fileno())
                conn.close()
    
        def handle_conn(self, listen_socket):
            try:
                conn, address = listen_socket.accept()
            except socket.error as e:
                if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
                    raise
                return
            conn.setblocking(0)
            self._fd_map[conn.fileno()] = (conn, self.handle_echo)
            self._epoll.register(conn.fileno(), EPOLLIN)
    
        # 处理回调
        def handler(self, fd, events):
            if events & EPOLLIN:
                socket, fd_handle = self._fd_map.get(fd, None)
                if socket and fd_handle:
                    fd_handle(socket)
    
        def run(self):
            while True:
                try:
                    events = self._epoll.poll(POLL_TIMEOUT)
                except Exception as e:
                    if e.errno in (errno.EPIPE, errno.EINTR):
                        # EPIPE: Happens when the client closes the connection
                        # EINTR: Happens when received a signal
                        pass
                    else:
                        print("Poll Exception: %s", e)
                    continue
    
                for fd, event in events:
                    self.handler(fd, event)

    优点:系统提供了事件分离器API:Linux下有epoll,BSD/Mac下有kqueue,Windows有iocp,实在不行还能退化到select和poll。利用API,可以方便地实现高并发服务器。

  • Proactor(非阻塞式异步IO)

    处理者(handler)直接发起一个异步读写操作,并提供相应的参数,如用于存放读到(发送)数据的缓存区,读的数据大小,以及请求完成后的回调函数等。

    事件分离器(event demultiplexor)等待操作的完成(比如文件描述符、socket可读写),在此过程中,操作系统利用并行的内核线程执行实际读写操作,并将结果数据存入处理者提供缓冲区。读写完毕后,通知事件分离器。事件分离器把事件和缓冲区转发给处理者。

    Google一下,了解到Python的异步IO主要依赖于asyncio库。用户只需继承asyncio.Protocol,实现connection_made、data_received、connection_lost等主要函数即可实现异步IO:

    import asyncio
    
    class EchoProtocol(asyncio.Protocol):
        def connection_made(self, transport):
            self.transport = transport
    
        def data_received(self, data):
            self.transport.write(data)
            self.transport.close()
    
    class EchoServer:
        def __init__(self, host, post):
            self.loop = asyncio.get_event_loop()
            server = self.loop.create_server(EchoProtocol, host, post)
            asyncio.async(server)
    
        def run(self):
            self.loop.run_forever()

对比Reactor的代码,我们发现Proactor无需自己调用recv和accept函数:在有新连接就绪时,connection_made会被调用,新连接以transport参数传入;在连接接收到数据时,data_received会被调用,数据以data参数传入。

为了一探“操作系统利用并行的内核线程执行实际读写操作”的具体使用姿势,我查看了asyncio的代码,发现在Unix下只是库代替我们在每次poll返回调用read,然后再通过data参数传递给data_received而已,其本质上只是使用Reactor来模拟Proactor的“异步”方式。之所以说是模拟,是因为在Proactor的定义中对缓冲区的实际读写操作是由操作系统进行的,而不象asyncio这样由库来代劳。

难道没有真正的Proactor吗?我以Proactor为关键字对代码进行搜索,发现了proactor_events.py。

查看代码发现,在该实现中,data_received()是在_loop_reading()中当future中的result就绪(self._state == _FINISHED)时取出result并作为data参数。该future通过_loop_reading()的参数传入。那_loop_reading()何时会被调用?

在_loop_reading()中我发现了以下两行:

self._read_fut = self._loop._proactor.recv(self._sock, 4096)
self._read_fut.add_done_callback(self._loop_reading)

_read_fut是通过self._loop._proactor.recv()产生的。转到定义发现,该recv()和socket的recv()大大不同:

def recv(self, conn, nbytes, flags=0):
        self._register_with_iocp(conn)
        ov = _overlapped.Overlapped(NULL)
        try:
            if isinstance(conn, socket.socket):
                ov.WSARecv(conn.fileno(), nbytes, flags)
            else:
                ov.ReadFile(conn.fileno(), nbytes)
        except BrokenPipeError:
            return self._result(b'')

        def finish_recv(trans, key, ov):
            try:
                return ov.getresult()
            except OSError as exc:
                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
                    raise ConnectionResetError(*exc.args)
                else:
                    raise

        return self._register(ov, conn, finish_recv)

该recv()在调用WSARecv()后通过_register()生成了一个future对象并返回。从代码上判断,WSARecv()应该是非阻塞的,因为:

  • recv()为future注册finish_recv()作为完成时的回调函数

  • _poll()中构造了一个事件循环,不断通过GetQueuedCompletionStatus()查询是否就绪,如果就绪,future的set_result()被调用,结果被设置,于是回调函数finish_recv()被调用,结果通过getresult()取出。

future的结果被设置后,之前_loop_reading()中为该future添加的回调函数被调用,没错,回调函数还是_loop_reading(),并把future自身作为参数调用。如此一来,问题就解决了:每次的_loop_reading()调用,在将上次recv()的结果通过data_received()传给用户的同时,通过recv()创建新的future,并为其注册得到结果后的回调。

这才是真正的Proactor。WSARecv()、GetQueuedCompletionStatus()和getresult()都是操作系统提供的接口,更准确来说,是对操作系统提供接口的封装,毕竟万物基于c(滑稽)。具体封装可在overlapped.c中找到:

static PyObject * Overlapped_WSARecv(OverlappedObject *self, PyObject *args)
{
    ......

    self->type = TYPE_READ;
    self->handle = handle;
    self->read_buffer = buf;
    wsabuf.len = size;
    wsabuf.buf = PyBytes_AS_STRING(buf);

    Py_BEGIN_ALLOW_THREADS
    ret = WSARecv((SOCKET)handle, &wsabuf, 1, &nread, &flags,
                  &self->overlapped, NULL);
    Py_END_ALLOW_THREADS

    ......
}


static PyObject * overlapped_GetQueuedCompletionStatus(PyObject *self, PyObject *args)
{
    ......

    Py_BEGIN_ALLOW_THREADS
    ret = GetQueuedCompletionStatus(CompletionPort, &NumberOfBytes,
                                    &CompletionKey, &Overlapped, Milliseconds);
    Py_END_ALLOW_THREADS

    err = ret ? ERROR_SUCCESS : GetLastError();
    if (Overlapped == NULL) {
        if (err == WAIT_TIMEOUT)
            Py_RETURN_NONE;
        else
            return SetFromWindowsErr(err);
    }
    return Py_BuildValue(F_DWORD F_DWORD F_ULONG_PTR F_POINTER,
                         err, NumberOfBytes, CompletionKey, Overlapped);
}


static PyObject * Overlapped_getresult(OverlappedObject *self, PyObject *args)
{
    ......

    Py_BEGIN_ALLOW_THREADS
    ret = GetOverlappedResult(self->handle, &self->overlapped, &transferred,
                              wait);
    Py_END_ALLOW_THREADS

    self->error = err = ret ? ERROR_SUCCESS : GetLastError();
    switch (err) {
        case ERROR_SUCCESS:
        case ERROR_MORE_DATA:
            break;
        case ERROR_BROKEN_PIPE:
            if ((self->type == TYPE_READ || self->type == TYPE_ACCEPT) && self->read_buffer != NULL)
                break;
            /* fall through */
        default:
            return SetFromWindowsErr(err);
    }

    switch (self->type) {
        case TYPE_READ:
            assert(PyBytes_CheckExact(self->read_buffer));
            if (transferred != PyBytes_GET_SIZE(self->read_buffer) &&
                _PyBytes_Resize(&self->read_buffer, transferred))
                return NULL;
            Py_INCREF(self->read_buffer);
            return self->read_buffer;
        default:
            return PyLong_FromUnsignedLong((unsigned long) transferred);
    }
}

从中我们找到操作系统的真正接口:WSARecv、GetQueuedCompletionStatus和GetOverlappedResult。蛋疼的是,我对这三个接口都很陌生,果断Google之,发现原来是Windows的接口:

WSARecv function

Receives data from a connected socket or a bound connectionless socket.

int WSARecv( In SOCKET s, Inout LPWSABUF lpBuffers, In DWORD dwBufferCount, Out LPDWORD lpNumberOfBytesRecvd, Inout LPDWORD lpFlags, In LPWSAOVERLAPPED lpOverlapped, In LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );

GetQueuedCompletionStatus function

Attempts to dequeue an I/O completion packet from the specified I/O completion port. If there is no completion packet queued, the function waits for a pending I/O operation associated with the completion port to complete.

BOOL WINAPI GetQueuedCompletionStatus(

In HANDLE CompletionPort, Out LPDWORD lpNumberOfBytes, Out PULONG_PTR lpCompletionKey, Out LPOVERLAPPED *lpOverlapped, In DWORD dwMilliseconds );

GetOverlappedResult function

Retrieves the results of an overlapped operation on the specified file, named pipe, or communications device.

BOOL WINAPI GetOverlappedResult( In HANDLE hFile, In LPOVERLAPPED lpOverlapped, Out LPDWORD lpNumberOfBytesTransferred, In BOOL bWait );

可见WSARecv只需通过lpBuffers传入缓冲区(包括指针和长度),即可以异步地读取数据。操作系统把读到的数据写入传入的缓冲区中,这里为self->read_buffer,直到写完时,lpCompletionRoutine回调被调用,但是由于我们没有设置回调函数(NULL),所以需要通过事件循环不断调用GetQueuedCompletionStatus查询接收操作是否就绪,如果就绪,调用GetOverlappedResult查询读取是否成功(因为就绪还有可能是因为出错)。如果成功,返回已由操作系统填充好数据的self->read_buffer。复杂性全由操作系统承担,用起来真是舒服,但是为啥只有Windows有啊!

看回proactor_events.py,发现其在文件最顶部就做了声明:

A proactor is a "notify-on-completion" multiplexer. Currently a proactor is only implemented on Windows with IOCP.

缺点:

  1. 非抢占。任务间无优先级概念,一个需要100s处理的请求可能阻塞100个1s就能够处理的请求。

  2. 为了实现非阻塞,自然回调函数也不能阻塞,为此,要么将业务逻辑拆分成若干个回调(状态机),要么维护好上下文状态,遇到阻塞时跳出(协程)。

More

在没有新的模型提出来之前,我们能玩的花样无非就是将以上的模型进行组合。比如Reactor + process、Reactor + Process Pool、Reactor + Thread、Reactor + Thread Pool等等等等。陈硕开发的muduo库,就是Reactor和Thread Pool的组合,一个线程一个event loop,充分发挥多核服务器的性能,以并行解决并发问题。

总结

通过以上对服务端常见模型的分析,我们对其有了初步的了解,在应用时自然而然会做出合适的选择。事实上,在当前C10K的背景下,我们越来越倾向于使用Reactor和Proactor来应对数以万计的的连接。这也就是Nginx能过从霸主Apache中不断抢得份额的原因。

而同样是事件驱动模型,我们发现比起Reactor,Proactor由操作系统进行读写,在操作系统有支持的情况下,性能更高、伸缩性更好。但是由于“Linux社区对于填上aio的坑并没太大的动力”,为了统一异步接口,我们只好对Proactor进行模拟,模拟的结果,自然是在性能上有所损失。

参考:

http://zhuoqiang.me/python-thread-gil-and-ctypes.html

https://en.wikipedia.org/wiki/Event-driven_programming

https://msdn.microsoft.com/en-us/library/windows/desktop/ms741688(v=vs.85).aspx

https://msdn.microsoft.com/en-us/library/windows/desktop/aa364986(v=vs.85).aspx

https://msdn.microsoft.com/en-us/library/windows/desktop/ms683209(v=vs.85).aspx

http://www.zhihu.com/question/22064431