Libevent印象,说说异步IO
Toc
  1. Example: A simple blocking HTTP client
  2. Bad Example
  3. Example: Forking ROT13 server
  4. Bad Example: busy-polling all sockets
  5. Example: Using select
  6. Example: select()-based ROT13 server
  7. Example: A low-level ROT13 server with Libevent
  • 使用Libevent方便吗,在Windows下又如何
    1. Example: A simpler ROT13 server with Libevent
  • 真的有这么高效率?
  • 原文:Chapter 1: A tiny introduction to asynchronous IO.

    很多人开始时使用的是阻塞式IO函数。
    如果一个函数在操作完成之前,或者在超时之前,都不会返回,那么就说这个函数是同步的。
    比如当你对一个TCP连接调用connect(),你的操作系统会有一个队列,一个保存发送出去的SYN请求的队列。
    然后对于每个SYN请求,系统尝试等待TCP另一端返回对应的确认码,即SYN ACK
    在确认码ACK返回之前,或者直到超时,同步的函数是不会返回,称之为阻塞IO。

    下面有个使用阻塞IO函数的例子,它打开一个连接,连接到www.google.com,发送一个简单的HTTP请求,然后打印出返回内容到stdout。

    Example: A simple blocking HTTP client

    /* For sockaddr_in */
    #include <netinet/in.h>
    /* For socket functions */
    #include <sys/socket.h>
    /* For gethostbyname */
    #include <netdb.h>
    
    #include <unistd.h>
    #include <string.h>
    #include <stdio.h>
    
    int main(int c, char **v)
    {
        const char query[] =
            "GET / HTTP/1.0\r\n"
            "Host: www.google.com\r\n"
            "\r\n";
        const char hostname[] = "www.google.com";
        struct sockaddr_in sin;
        struct hostent *h;
        const char *cp;
        int fd;
        ssize_t n_written, remaining;
        char buf[1024];
    
        /* Look up the IP address for the hostname.  
           Watch out; 
           this isn't threadsafe on most platforms. */  
        h = gethostbyname(hostname);
        if (!h) {
            fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno));
            return 1;
        }
        if (h->h_addrtype != AF_INET) {
            fprintf(stderr, "No ipv6 support, sorry.");
            return 1;
        }
    
        /* Allocate a new socket */
        fd = socket(AF_INET, SOCK_STREAM, 0);
        if (fd < 0) {
            perror("socket");
            return 1;
        }
    
        /* Connect to the remote host. */
        sin.sin_family = AF_INET;
        sin.sin_port = htons(80);
        sin.sin_addr = *(struct in_addr*)h->h_addr;
        if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) {
            perror("connect");
            close(fd);
            return 1;
        }
    
        /* Write the query. */
        /* XXX Can send succeed partially? */
        cp = query;
        remaining = strlen(query);
        while (remaining) {
          n_written = send(fd, cp, remaining, 0);
          if (n_written <= 0) {
            perror("send");
            return 1;
          }
          remaining -= n_written;
          cp += n_written;
        }
    
        /* Get an answer back. */
        while (1) {
            ssize_t result = recv(fd, buf, sizeof(buf), 0);
            if (result == 0) {
                break;
            } else if (result < 0) {
                perror("recv");
                close(fd);
                return 1;
            }
            fwrite(buf, 1, result, stdout);
        }
    
        close(fd);
        return 0;
    }
    

    上面使用的网络相关函数都是阻塞式的。

    1. gethostbyname在成功解析域名www.google.com或超时前是不会返回的;
    2. connect在成功连接后才返回;
    3. recv接收数据才返回,或者对方关闭了sock也会让recv返回;
    4. send也阻塞,直到把数据复制到系统内核buffer之中。

    如果你在同一时间内只做一个事情,IO阻塞函数也没有什么不好。但假若你的程序里要同时响应多个连接,比如你需要同时从2个连接sock中接收数据,而且你不知道哪个数据先到来,你不能这样写你的程序:

    Bad Example

    /* This won't work. */
    char buf[1024];
    int i, n;
    while (i_still_want_to_read()) {
        for (i=0; i<n_sockets; ++i) {
            n = recv(fd[i], buf, sizeof(buf), 0);
            if (n==0)
                handle_close(fd[i]);
            else if (n<0)
                handle_error(fd[i], errno);
            else
                handle_input(fd[i], buf, n);
        }
    }
    

    因为如果fd[2]的数据先到来的话,这段代码甚至不会尝试去读取fd[1]的数据,因为它要求数据是按顺序到来,而这明显是简单地想当然了。

    当然也可以使用多个进程/线程来处理每个sock,每个sock的数据处理互不影响,A进程阻塞了,并不影响到B进程的工作。
    举例,下面的代码是我们的ROT13服务端代码,使用unix的fork()为每个连接建立一个进程。从端口40713中每次读取一行,对读取的该行每个字符做点转换,然后把该行数据写回。

    Example: Forking ROT13 server

    /* For sockaddr_in */
    #include <netinet/in.h>
    /* For socket functions */
    #include <sys/socket.h>
    
    #include <unistd.h>
    #include <string.h>
    #include <stdio.h>
    #include <stdlib.h>
    
    #define MAX_LINE 16384
    
    char
    rot13_char(char c)
    {
        /* We don't want to use isalpha here; setting the locale would change
         * which characters are considered alphabetical. */
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    void
    child(int fd)
    {
        char outbuf[MAX_LINE+1];
        size_t outbuf_used = 0;
        ssize_t result;
    
        while (1) {
            char ch;
            result = recv(fd, &ch, 1, 0);
            if (result == 0) {
                break;
            } else if (result == -1) {
                perror("read");
                break;
            }
    
            /* We do this test to keep the user from overflowing the buffer. */
            if (outbuf_used < sizeof(outbuf)) {
                outbuf[outbuf_used++] = rot13_char(ch);
            }
    
            if (ch == '\n') {
                send(fd, outbuf, outbuf_used, 0);
                outbuf_used = 0;
                continue;
            }
        }
    }
    
    void
    run(void)
    {
        int listener;
        struct sockaddr_in sin;
    
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = 0;
        sin.sin_port = htons(40713);
    
        listener = socket(AF_INET, SOCK_STREAM, 0);
    
    #ifndef WIN32
        {
            int one = 1;
            setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
        }
    #endif
    
        if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
            perror("bind");
            return;
        }
    
        if (listen(listener, 16)<0) {
            perror("listen");
            return;
        }
    
    
    
        while (1) {
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
            int fd = accept(listener, (struct sockaddr*)&ss, &slen);
            if (fd < 0) {
                perror("accept");
            } else {
                if (fork() == 0) {
                    child(fd);
                    exit(0);
                }
            }
        }
    }
    
    int
    main(int c, char **v)
    {
        run();
        return 0;
    }
    

    那么,有没有更好的方案来同时处理多个连接吗?我就这样使用上面的方案回去解决我的问题了吗?
    当然不是!
    首先,在一些平台上,创建一个进程/线程的代价是很昂贵的。实际开发中你会使用一个线程池,而不是创建一个新进程。 不过,假若你需要处理数以千万个连接,维护这么多线程,性能也许没有你期待的那么美好。

    使用线程不是最好的答案。在Unix下,你可以设置sock为非阻塞,使用函数fcntl

    fcntl(fd, F_SETFL, O_NONBLOCK); 
    //fd对应于sock的文件描述符(file descriptor)
    
    //其实一般先获取sock的flag,修改flag,再设置新的flag,如下大概  
    /* Set a socket as nonblocking */
    int flags; 
    if ((flags = fcntl (fd, F_GETFL, 0)) < 0) 
         err_sys("F_GETFL error"); 
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) < 0) 
         err_sys("F_SETFL error");
    

    一旦你对sock fd设置非阻塞,那么对这个fd调用网络相关的函数,比如recv,函数会马上返回,这时你要检查返回码以及全局变量errno,上面从多个sock读取数据的代码段修改为:

    Bad Example: busy-polling all sockets

    /* This will work, but the performance will be unforgivably bad. */
    int i, n;
    char buf[1024];
    for (i=0; i < n_sockets; ++i)
        fcntl(fd[i], F_SETFL, O_NONBLOCK);
    
    while (i_still_want_to_read()) {
        for (i=0; i < n_sockets; ++i) {
            n = recv(fd[i], buf, sizeof(buf), 0);
            if (n == 0) {
                handle_close(fd[i]);
            } else if (n < 0) {
                if (errno == EAGAIN)
                     ; /* The kernel didn't have any data for us to read. */
                else
                     handle_error(fd[i], errno);
             } else {
                handle_input(fd[i], buf, n);
             }
        }
    }
    

    上面的代码也存在性能问题,2个原因:

    1. 如果没有数据到来,代码不断循环,不断消耗cpu;
    2. 每次轮询都会调用内核级别的函数(即系统调用),因为有没有数据可以读取,一般是检查内核数据buffer,这个过程由系统调用帮我们做检查。我们不断轮询,每次产生系统调用的消耗, 不是很环保的做法。

    我们需要更为智能的方式,当数据最后可读时让内核主动告诉我们。最古老的方式是使用select()
    函数原型:

    int select(int nfds, 
        fd_set *restrict readfds, 
        fd_set *restrict writefds,
         fd_set *restrict errorfds, 
         struct timeval *restrict timeout);
    

    select这个函数使用了3个sock fd集合(set of fds), 分别对应:

    1. 可读的fd集合,告诉select()请检查这个集合内的fd们,若其中某一个可读,请select返回,告诉我集合中有多少个fd有数据可以读了,下2个同意思;
    2. 可写的fd集合;
    3. 异常的fd集合;

    Example: Using select

    /* If you only have a couple dozen fds, this version won't be awful */
    fd_set readset;
    int i, n;
    char buf[1024];
    
    while (i_still_want_to_read()) {
        int maxfd = -1;
        FD_ZERO(&readset);
    
        /* Add all of the interesting fds to readset */
        for (i=0; i < n_sockets; ++i) {
             if (fd[i]>maxfd) maxfd = fd[i];
             FD_SET(fd[i], &readset);
        }
    
        /* Wait until one or more fds are ready to read */
        select(maxfd+1, &readset, NULL, NULL, NULL);
    
        /* Process all of the fds that are still set in readset */
        for (i=0; i < n_sockets; ++i) {
            if (FD_ISSET(fd[i], &readset)) {
                n = recv(fd[i], buf, sizeof(buf), 0);
                if (n == 0) {
                    handle_close(fd[i]);
                } else if (n < 0) {
                    if (errno == EAGAIN)
                         ; /* The kernel didn't have any data for us to read. */
                    else
                         handle_error(fd[i], errno);
                 } else {
                    handle_input(fd[i], buf, n);
                 }
            }
        }
    }
    

    然后,对于上面已经提到的ROT13服务端代码(也许你已经忘了),使用select():

    Example: select()-based ROT13 server

    /* For sockaddr_in */
    #include <netinet/in.h>
    /* For socket functions */
    #include <sys/socket.h>
    /* For fcntl */
    #include <fcntl.h>
    /* for select */
    #include <sys/select.h>
    
    #include <assert.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <errno.h>
    
    #define MAX_LINE 16384
    
    char
    rot13_char(char c)
    {
        /* We don't want to use isalpha here; setting the locale would change
         * which characters are considered alphabetical. */
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    struct fd_state {
        char buffer[MAX_LINE];
        size_t buffer_used;
    
        int writing;
        size_t n_written;
        size_t write_upto;
    };
    
    struct fd_state *
    alloc_fd_state(void)
    {
        struct fd_state *state = malloc(sizeof(struct fd_state));
        if (!state)
            return NULL;
        state->buffer_used = state->n_written = state->writing =
            state->write_upto = 0;
        return state;
    }
    
    void
    free_fd_state(struct fd_state *state)
    {
        free(state);
    }
    
    void
    make_nonblocking(int fd)
    {
        fcntl(fd, F_SETFL, O_NONBLOCK);
    }
    
    int
    do_read(int fd, struct fd_state *state)
    {
        char buf[1024];
        int i;
        ssize_t result;
        while (1) {
            result = recv(fd, buf, sizeof(buf), 0);
            if (result <= 0)
                break;
    
            for (i=0; i < result; ++i)  {
                if (state->buffer_used < sizeof(state->buffer))
                    state->buffer[state->buffer_used++] = rot13_char(buf[i]);
                if (buf[i] == '\n') {
                    state->writing = 1;
                    state->write_upto = state->buffer_used;
                }
            }
        }
    
        if (result == 0) {
            return 1;
        } else if (result < 0) {
            if (errno == EAGAIN)
                return 0;
            return -1;
        }
    
        return 0;
    }
    
    int
    do_write(int fd, struct fd_state *state)
    {
        while (state->n_written < state->write_upto) {
            ssize_t result = send(fd, state->buffer + state->n_written,
                                  state->write_upto - state->n_written, 0);
            if (result < 0) {
                if (errno == EAGAIN)
                    return 0;
                return -1;
            }
            assert(result != 0);
    
            state->n_written += result;
        }
    
        if (state->n_written == state->buffer_used)
            state->n_written = state->write_upto = state->buffer_used = 0;
    
        state->writing = 0;
    
        return 0;
    }
    
    void
    run(void)
    {
        int listener;
        struct fd_state *state[FD_SETSIZE];
        struct sockaddr_in sin;
        int i, maxfd;
        fd_set readset, writeset, exset;
    
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = 0;
        sin.sin_port = htons(40713);
    
        for (i = 0; i < FD_SETSIZE; ++i)
            state[i] = NULL;
    
        listener = socket(AF_INET, SOCK_STREAM, 0);
        make_nonblocking(listener);
    
    #ifndef WIN32
        {
            int one = 1;
            setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
        }
    #endif
    
        if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
            perror("bind");
            return;
        }
    
        if (listen(listener, 16)<0) {
            perror("listen");
            return;
        }
    
        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);
    
        while (1) {
            maxfd = listener;
    
            FD_ZERO(&readset);
            FD_ZERO(&writeset);
            FD_ZERO(&exset);
    
            FD_SET(listener, &readset);
    
            for (i=0; i < FD_SETSIZE; ++i) {
                if (state[i]) {
                    if (i > maxfd)
                        maxfd = i;
                    FD_SET(i, &readset);
                    if (state[i]->writing) {
                        FD_SET(i, &writeset);
                    }
                }
            }
    
            if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) {
                perror("select");
                return;
            }
    
            if (FD_ISSET(listener, &readset)) {
                struct sockaddr_storage ss;
                socklen_t slen = sizeof(ss);
                int fd = accept(listener, (struct sockaddr*)&ss, &slen);
                if (fd < 0) {
                    perror("accept");
                } else if (fd > FD_SETSIZE) {
                    close(fd);
                } else {
                    make_nonblocking(fd);
                    state[fd] = alloc_fd_state();
                    assert(state[fd]);/*XXX*/
                }
            }
    
            for (i=0; i < maxfd+1; ++i) {
                int r = 0;
                if (i == listener)
                    continue;
    
                if (FD_ISSET(i, &readset)) {
                    r = do_read(i, state[i]);
                }
                if (r == 0 && FD_ISSET(i, &writeset)) {
                    r = do_write(i, state[i]);
                }
                if (r) {
                    free_fd_state(state[i]);
                    state[i] = NULL;
                    close(i);
                }
            }
        }
    }
    
    int
    main(int c, char **v)
    {
        setvbuf(stdout, NULL, _IONBF, 0);
    
        run();
        return 0;
    }
    

    但是我们还没完成。随着每个fd集合中fd数量的增多,每次检查也相应要花费更多时间。
    不同的系统提供不同的优化方案,包括poll(), epoll(), kqueue(), evports, /dev/poll。
    所有这些优化都能获得更好的性能,而且除了poll(),其他的函数,增加、删除一个fd,或者测试sock是否可读写,这3个操作都是O(1)的效率。

    可惜这些优化的方案,都不是标准。linux使用epoll(),BSDs(包括苹果内核)使用kqueue(),Solaris使用evports和/dev/poll, 致命的是,同个系统只使用他们的优化方案,不包括其他,比如linux上就没有使用kqueue()。

    所以,如果你想要写一个高性能异步IO的程序,若考虑移植和跨平台,你还需要做一些额外的包装。

    于是,Libevent的API做了上面提到的这些。

    马上再看一个异步IO的ROTI13服务端代码。这次我们使用Libevent2来代替select()。注意fd_sets被替换了,现在使用的是事件events。

    Example: A low-level ROT13 server with Libevent

    /* For sockaddr_in */
    #include <netinet/in.h>
    /* For socket functions */
    #include <sys/socket.h>
    /* For fcntl */
    #include <fcntl.h>
    
    #include <event2/event.h>
    
    #include <assert.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <errno.h>
    
    #define MAX_LINE 16384
    
    void do_read(evutil_socket_t fd, short events, void *arg);
    void do_write(evutil_socket_t fd, short events, void *arg);
    
    char
    rot13_char(char c)
    {
        /* We don't want to use isalpha here; setting the locale would change
         * which characters are considered alphabetical. */
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    struct fd_state {
        char buffer[MAX_LINE];
        size_t buffer_used;
    
        size_t n_written;
        size_t write_upto;
    
        struct event *read_event;
        struct event *write_event;
    };
    
    struct fd_state *
    alloc_fd_state(struct event_base *base, evutil_socket_t fd)
    {
        struct fd_state *state = malloc(sizeof(struct fd_state));
        if (!state)
            return NULL;
        state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
        if (!state->read_event) {
            free(state);
            return NULL;
        }
        state->write_event =
            event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);
    
        if (!state->write_event) {
            event_free(state->read_event);
            free(state);
            return NULL;
        }
    
        state->buffer_used = state->n_written = state->write_upto = 0;
    
        assert(state->write_event);
        return state;
    }
    
    void
    free_fd_state(struct fd_state *state)
    {
        event_free(state->read_event);
        event_free(state->write_event);
        free(state);
    }
    
    void
    do_read(evutil_socket_t fd, short events, void *arg)
    {
        struct fd_state *state = arg;
        char buf[1024];
        int i;
        ssize_t result;
        while (1) {
            assert(state->write_event);
            result = recv(fd, buf, sizeof(buf), 0);
            if (result <= 0)
                break;
    
            for (i=0; i < result; ++i)  {
                if (state->buffer_used < sizeof(state->buffer))
                    state->buffer[state->buffer_used++] = rot13_char(buf[i]);
                if (buf[i] == '\n') {
                    assert(state->write_event);
                    event_add(state->write_event, NULL);
                    state->write_upto = state->buffer_used;
                }
            }
        }
    
        if (result == 0) {
            free_fd_state(state);
        } else if (result < 0) {
            if (errno == EAGAIN) /* XXXX use evutil macro */
                return;
            perror("recv");
            free_fd_state(state);
        }
    }
    
    void
    do_write(evutil_socket_t fd, short events, void *arg)
    {
        struct fd_state *state = arg;
    
        while (state->n_written < state->write_upto) {
            ssize_t result = send(fd, state->buffer + state->n_written,
                                  state->write_upto - state->n_written, 0);
            if (result < 0) {
                if (errno == EAGAIN) /* XXX use evutil macro */
                    return;
                free_fd_state(state);
                return;
            }
            assert(result != 0);
    
            state->n_written += result;
        }
    
        if (state->n_written == state->buffer_used)
            state->n_written = state->write_upto = state->buffer_used = 1;
    
        event_del(state->write_event);
    }
    
    void
    do_accept(evutil_socket_t listener, short event, void *arg)
    {
        struct event_base *base = arg;
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener, (struct sockaddr*)&ss, &slen);
        if (fd < 0) { // XXXX eagain??
            perror("accept");
        } else if (fd > FD_SETSIZE) {
            close(fd); /* XXX replace all closes with EVUTIL_CLOSESOCKET */
        } else {
            struct fd_state *state;
            evutil_make_socket_nonblocking(fd);
            state = alloc_fd_state(base, fd);
            assert(state); /*XXX err*/
            assert(state->write_event);
            event_add(state->read_event, NULL);
        }
    }
    
    void
    run(void)
    {
        evutil_socket_t listener;
        struct sockaddr_in sin;
        struct event_base *base;
        struct event *listener_event;
    
        base = event_base_new();
        if (!base)
            return; /*XXXerr*/
    
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = 0;
        sin.sin_port = htons(40713);
    
        listener = socket(AF_INET, SOCK_STREAM, 0);
        evutil_make_socket_nonblocking(listener);
    
    #ifndef WIN32
        {
            int one = 1;
            setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
        }
    #endif
    
        if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
            perror("bind");
            return;
        }
    
        if (listen(listener, 16)<0) {
            perror("listen");
            return;
        }
    
        listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
        /*XXX check it */
        event_add(listener_event, NULL);
    
        event_base_dispatch(base);
    }
    
    int
    main(int c, char **v)
    {
        setvbuf(stdout, NULL, _IONBF, 0);
    
        run();
        return 0;
    }
    

    注意到,sockets的数据类型不是int,我们做了包装: evutil_socket_t; 不是使用fcntl(O_NONBLOCK)来设置sock非阻塞,替换成evutil_make_socket_nonblocking这个函数。这些封装,让这个程序也能兼容windows的API。


    使用Libevent方便吗,在Windows下又如何

    也许你注意到最后的代码更为高效,也更为复杂。

    回过头来看,我们并没有为每个连接维护一个buffer,每个进程只是使用一份栈上分配的内存。不需要精准跟踪当前是哪个sock的读写,也不需跟踪当前有多少sock完成了操作,我们只是循环,推出栈上的变量。

    还有,如果你有Windows丰富的网络编程经验,你将会注意到,按上面的模型使用Libevent写的程序,在Windows下可能不会有很好的性能。在Windows下并不是使用类似select的调用来实现快速异步IO,而是使用IOCP (IO Completion Ports)这个API。

    不像其他所有的快速网络API,IOCP并不会通知你的程序说哪些sock准备好读写了。反过来,需要先由程序告诉Windows networking stack开始网络操作,然后由IOCP来告知哪些操作完成了。(popozhu注:我的理解是,linux是等内核通知你可以读或写,读写操作可以马上非阻塞地返回,而Windows则是程序先做读或写,然后程序苦等IOCP把执行结果告知程序。前者是发起读写,后者是结束读写,后者可能存在等待吧)。

    幸运的是,Libevent2 “bufferevents”的接口解决了这个问题,让接口更容易使用,而且提供了适用于linux和windows的高效接口。
    于是,ROT13服务端代码最终版,使用bufferevents这个API。

    Example: A simpler ROT13 server with Libevent

    /* For sockaddr_in */
    #include <netinet/in.h>
    /* For socket functions */
    #include <sys/socket.h>
    /* For fcntl */
    #include <fcntl.h>
    
    #include <event2/event.h>
    #include <event2/buffer.h>
    #include <event2/bufferevent.h>
    
    #include <assert.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <errno.h>
    
    #define MAX_LINE 16384
    
    void do_read(evutil_socket_t fd, short events, void *arg);
    void do_write(evutil_socket_t fd, short events, void *arg);
    
    char
    rot13_char(char c)
    {
        /* We don't want to use isalpha here; setting the locale would change
         * which characters are considered alphabetical. */
        if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
            return c + 13;
        else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
            return c - 13;
        else
            return c;
    }
    
    void
    readcb(struct bufferevent *bev, void *ctx)
    {
        struct evbuffer *input, *output;
        char *line;
        size_t n;
        int i;
        input = bufferevent_get_input(bev);
        output = bufferevent_get_output(bev);
    
        while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
            for (i = 0; i < n; ++i)
                line[i] = rot13_char(line[i]);
            evbuffer_add(output, line, n);
            evbuffer_add(output, "\n", 1);
            free(line);
        }
    
        if (evbuffer_get_length(input) >= MAX_LINE) {
            /* Too long; just process what there is and go on so that the buffer
             * doesn't grow infinitely long. */
            char buf[1024];
            while (evbuffer_get_length(input)) {
                int n = evbuffer_remove(input, buf, sizeof(buf));
                for (i = 0; i < n; ++i)
                    buf[i] = rot13_char(buf[i]);
                evbuffer_add(output, buf, n);
            }
            evbuffer_add(output, "\n", 1);
        }
    }
    
    void
    errorcb(struct bufferevent *bev, short error, void *ctx)
    {
        if (error & BEV_EVENT_EOF) {
            /* connection has been closed, do any clean up here */
            /* ... */
        } else if (error & BEV_EVENT_ERROR) {
            /* check errno to see what error occurred */
            /* ... */
        } else if (error & BEV_EVENT_TIMEOUT) {
            /* must be a timeout event handle, handle it */
            /* ... */
        }
        bufferevent_free(bev);
    }
    
    void
    do_accept(evutil_socket_t listener, short event, void *arg)
    {
        struct event_base *base = arg;
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener, (struct sockaddr*)&ss, &slen);
        if (fd < 0) {
            perror("accept");
        } else if (fd > FD_SETSIZE) {
            close(fd);
        } else {
            struct bufferevent *bev;
            evutil_make_socket_nonblocking(fd);
            bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
            bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
            bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
            bufferevent_enable(bev, EV_READ|EV_WRITE);
        }
    }
    
    void
    run(void)
    {
        evutil_socket_t listener;
        struct sockaddr_in sin;
        struct event_base *base;
        struct event *listener_event;
    
        base = event_base_new();
        if (!base)
            return; /*XXXerr*/
    
        sin.sin_family = AF_INET;
        sin.sin_addr.s_addr = 0;
        sin.sin_port = htons(40713);
    
        listener = socket(AF_INET, SOCK_STREAM, 0);
        evutil_make_socket_nonblocking(listener);
    
    #ifndef WIN32
        {
            int one = 1;
            setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
        }
    #endif
    
        if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
            perror("bind");
            return;
        }
    
        if (listen(listener, 16)<0) {
            perror("listen");
            return;
        }
    
        listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
        /*XXX check it */
        event_add(listener_event, NULL);
    
        event_base_dispatch(base);
    }
    
    int
    main(int c, char **v)
    {
        setvbuf(stdout, NULL, _IONBF, 0);
    
        run();
        return 0;
    }
    


    真的有这么高效率?

    官方压测文档是很久之前的了,文档作者表示怀疑。

    – EOF –

    Categories: in_lib
    Tags: libevent