- bufferevent和缓存
- 回调函数和水位标志(watermark)
- 延迟的回调
- bufferevent可选的标志
- 使用基于socket的bufferevent
- bufferevent通用操作
- 类型相关的bufferevent函数
原文:http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html
很多时候, 一个程序除了响应事件之外还想缓存一些数据。比如当我们想写数据,通常会用这样的模式:
- 向一个连接写一些数据,先把数据存放到一个buffer里
- 等待连接变成可写状态
- 尽可能写入数据
- 记住我们写了多少数据,以及还有多少数据未写,然后等待下次连接再变成可写状态。
Libevent为这种带缓存的IO模式提供了一个通用的机制。一个”bufferevent”包含一个底层传输(比如socket),一个读buffer和一个写buffer。当底层传输可读写时就调用回调函数,这是普通事件的处理,而bufferevent是在读或写足够的数据时才调用用户指定的回调函数。
有多种类型的bufferevent使用相同的接口,比如下面的类型:
- socket-based bufferevents
这种bufferevent的底层传输为stream socket,使用event_*
这种接口作为它的后端。 - asynchronous-IO bufferevents
限于Windows,实验中。这种类型的bufferevent使用Windows IOCP接口来读写数据。 - filtering bufferevents
在数据被传送到bufferevent事件对象之前,这种buffevent会对将要读或写的数据进行预处理,比如压缩或转换数据。 - paired bufferevents
相互之间传输数据的一对bufferevent。
注意:
在Libevent 2.0.2-alpha这个版本,bufferevent的接口还不能通用于上面列出的bufferevent类型,也就是说,下面列出的每个接口不一定适用于所有bufferevent类型。未来版本中可能会改善。
再次注意:
bufferevent当前只能适用于面向流的协议,比如TCP,而面向数据报文的协议,比如UDP,未来可能也会支持。
bufferevent和缓存
每个bufferevent都有一个输入缓存和一个输出缓存,对应的数据结构为struct evbuffer
。当你对一个bufferevent写数据,数据写入到输出缓存;当有数据可读时,你是从输入缓存中提取数据。
evbuffer的接口支持多种操作,后一节中会讨论到。
回调函数和水位标志(watermark)
每个bufferevent有2个相关连的回调函数:读回调和写回调。默认地,当从底层传输读到数据,读回调就被调用,当输出缓存的数据被清空,写回调就调用。但你可以调整bufferevent的“水位标志”来覆盖这些函数的默认行为。
每个bufferevent有4个水位标志:
- 读低水位Read low-water mark
当一个读操作让bufferevent的输入缓存达到或超出这个水位,读回调就被调用。默认为0,所以每个读操作都会导致读回调被执行。 - 读高水位Read high-water mark
当bufferevent的输入缓存达到这个水位,bufferevent停止读取数据,直到数据被取出,再次低于这个水位为止。 默认是不做限制,所以会不停地读取数据到输入缓存里。 - 写低水位Write low-water mark
当一个输出缓存达到或低于这个水位,写回调就被调用。默认为0,所以只有当输出缓存的数据全部发送出去之后,写回调才被调用。 - 写高水位Write high-water mark
bufferevent不直接使用这个水位,当一个bufferevent被用于另一个bufferevent的地层传输时,这个标志可以有特殊的含义。见下面的filtering bufferevents。
一个bufferevent也有一个error
和event
回调函数,这2个函数与数据无关,当一个连接被关闭或出现错误,它们会被调用来通知到应用程序。定义了如下的事件标志:
- BEV_EVENT_READING
表示当bufferevent在读操作时有一个事件到来。 - BEV_EVENT_WRITING
表示当bufferevent在写操作时有一个事件到来。 - BEV_EVENT_ERROR
表示bufferevent在操作时有错误发生。需要更多关于错误信息,调用EVUTIL_SOCKET_ERROR()
。 - BEV_EVENT_TIMEOUT
bufferevent的超时事件。 - BEV_EVENT_EOF
到达文件末尾。 - BEV_EVENT_CONNECTED
bufferevent完成一个另一端发起请求的连接。
延迟的回调
默认上,当对应的情况发生,buffevent回调函数会马上被执行。(对于evbuffer的回调函数也是如此)当存在复杂依赖时这个立刻调用却反而带来麻烦。比如有一个回调函数,当evbuffer A里的数据为空时执行,把数据写到buffer里,另有一个回调函数,当evbuffer A的buffer数据满了时实行,把数据从buffer里读出。由于这些回调都在栈上执行,如果依赖足够冗长,则可能会出现栈溢出。
为了解决这个问题,你可以告诉bufferevent(或者evbuffer)应该推迟执行它的回调函数。这样当条件满足后回调不会立刻执行,而是进入event_loop()的队列里,当队列里的常规回调函数执行完毕后,这个延迟的回调函数才被调用。
bufferevent可选的标志
在创建一个bufferevent时你可以使用一个或多个标志。可用的标志有:
- BEV_OPT_CLOSE_ON_FREE
当bufferevent被释放时,关闭底层传输。这会关闭一个底层的socker,释放一个底层的bufferevent等。 - BEV_OPT_THREADSAFE
自动为bufferevent分配锁,所以可安全用于多线程。 - BEV_OPT_DEFER_CALLBACKS
如果设置了这个标志,bufferevent推迟所有回调的执行,如上所述。 - BEV_OPT_UNLOCK_CALLBACKS
默认上若一个bufferevent设置成线程安全,则当用户提供的回调调用时会获取该bufferevent的锁。设置这个标志来让Libevent在完成你的回调函数后释放bufferevent的锁。
使用基于socket的bufferevent
基于socket的bufferevent使用起来最简单。基于socket的bufferevent使用Libevent的底层事件机制来检测底层网络socket是否可读写,也使用底层网络调用(比如readv,writev,WSASend,或者WSARecv)来传输和接收数据。
创建一个基于socket的bufferevent
使用bufferevent_socket_new()
来创建。接口如下:
struct bufferevent *bufferevent_socket_new(
struct event_base *base,
evutil_socket_t fd,
enum bufferevent_options options);
base
是event_base,options
是bufferevent标志的位掩码(BEV_OPT_CLOSE_ON_FREE等)。而fd
则是可选的socket fd,如果你想稍后再设置fd,可以传-1。
提示: 确保你提供的socket是非阻塞模式的,Libevent提供了一个便捷的函数evutil_make_socket_nonblocking()
来设置一个socket为非阻塞。
如果成功,这个函数返回一个bufferevent,如果失败,返回NULL。
在bufferevent上启用连接
如果bufferevent的socket还没连接,你可以创建一个连接,接口:
int bufferevent_socket_connect(struct bufferevent *bev,
struct sockaddr *address, int addrlen);
参数address
和addrlen
跟标准的connect()
一样。如果bufferevent还没有一个socker,调用这个函数会为它创建一个新的socket,并设置socket为非阻塞。
如果bufferevent已经有一个socket,调用bufferevent_socket_connect()
告诉Libevent该socket未连接,在连接成功之前不会有读或写操作返回。
在连接返回前向输出buffer添加数据则是可以的。
如果连接成功建立,这个函数返回0,如果出现错误,返回-1。
Example
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <sys/socket.h>
#include <string.h>
void eventcb(struct bufferevent *bev, short events, void *ptr)
{
if (events & BEV_EVENT_CONNECTED) {
/* We're connected to 127.0.0.1:8080. Ordinarily we'd do
something here, like start reading or writing. */
} else if (events & BEV_EVENT_ERROR) {
/* An error occured while connecting. */
}
}
int main_loop(void)
{
struct event_base *base;
struct bufferevent *bev;
struct sockaddr_in sin;
base = event_base_new();
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
sin.sin_port = htons(8080); /* Port 8080 */
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, NULL, NULL, eventcb, NULL);
if (bufferevent_socket_connect(bev,
(struct sockaddr *)&sin, sizeof(sin)) < 0) {
/* Error starting connection */
bufferevent_free(bev);
return -1;
}
event_base_dispatch(base);
return 0;
}
注意如果你尝试使用bufferevent_socket_connect()
来触发调用connect()
,你只会有一个BEV_EVENT_CONNECTED
事件。如果你自己调用connect()
,该连接触发的是一个写事件。
使用hostname来建立连接
更普遍的做法是把解析host和创建连接合并为一个操作,这里有一个接口:
int bufferevent_socket_connect_hostname(struct bufferevent *bev,
struct evdns_base *dns_base, int family, const char *hostname,
int port);
int bufferevent_socket_get_dns_error(struct bufferevent *bev);
这个函数解析DNS名字hostname
,查找地址的类属(AF_INET,AF_INET6和AF_UNSPEC)。如果解析失败则调用错误回调函数,如果成功则创建一个连接。
参数dns_base
可选,如果为NULL,Libevent会阻塞直到完成域名查找,而这一般非你所愿。如果不为NULL,Libevent会使用它来异步查找域名。
跟bufferevent_socket_connect()
一样,这个函数告诉Libevent任何已有的socket是未连接的,在完成域名解析且连接成功创建和返回前,该连接的读或写操作不应返回。
如果出现错误,可能是DNS域名查找出错,你可以调用bufferevent_socket_get_dns_error()
来查看最近的错误。如果让会的error为0,则检测不到DNS错误。
Example: Trivial HTTP v0 client
/* Don't actually copy this code: it is a poor way to implement an
HTTP client. Have a look at evhttp instead.
*/
#include <event2/dns.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <event2/event.h>
#include <stdio.h>
void readcb(struct bufferevent *bev, void *ptr)
{
char buf[1024];
int n;
struct evbuffer *input = bufferevent_get_input(bev);
while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
fwrite(buf, 1, n, stdout);
}
}
void eventcb(struct bufferevent *bev, short events, void *ptr)
{
if (events & BEV_EVENT_CONNECTED) {
printf("Connect okay.\n");
} else if (events & (BEV_EVENT_ERROR|BEV_EVENT_EOF)) {
struct event_base *base = ptr;
if (events & BEV_EVENT_ERROR) {
int err = bufferevent_socket_get_dns_error(bev);
if (err)
printf("DNS error: %s\n", evutil_gai_strerror(err));
}
printf("Closing\n");
bufferevent_free(bev);
event_base_loopexit(base, NULL);
}
}
int main(int argc, char **argv)
{
struct event_base *base;
struct evdns_base *dns_base;
struct bufferevent *bev;
if (argc != 3) {
printf("Trivial HTTP 0.x client\n"
"Syntax: %s [hostname] [resource]\n"
"Example: %s www.google.com /\n",argv[0],argv[0]);
return 1;
}
base = event_base_new();
dns_base = evdns_base_new(base, 1);
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, readcb, NULL, eventcb, base);
bufferevent_enable(bev, EV_READ|EV_WRITE);
evbuffer_add_printf(bufferevent_get_output(bev), "GET %s\r\n", argv[2]);
bufferevent_socket_connect_hostname(
bev, dns_base, AF_UNSPEC, argv[1], 80);
event_base_dispatch(base);
return 0;
}
bufferevent通用操作
这节的函数适用于多种bufferevent实现。
释放一个bufferevent
void bufferevent_free(struct bufferevent *bev);
Bufferevent在内部有引用计数,如果当你释放它时它有延迟的回调函数,则再回调被执行前它不会被释放。
然而bufferevent_free()
会尝试尽快释放bufferevent,如果bufferevent上有等待写的数据,在bufferevent被释放前它可能不会刷新缓存。
如果设置了BEV_OPT_CLOSE_ON_FREE
标志,且该buffereavent有一个socket或一个底层bufferevent,这个底层传输会在释放时被关闭。
操作回调、设置水位和启用功能
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
typedef void (*bufferevent_event_cb)(struct bufferevent *bev,
short events, void *ctx);
void bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg);
void bufferevent_getcb(struct bufferevent *bufev,
bufferevent_data_cb *readcb_ptr,
bufferevent_data_cb *writecb_ptr,
bufferevent_event_cb *eventcb_ptr,
void **cbarg_ptr);
函数bufferevent_setcb()
改变一个多或多个回调函数。参数readcb, writecb和eventcb在可读、可写或有事件被触发时被调用(各自被调用)。最后一个用户提供的参数cbarg将作为参数传给bufferevent_callcb()
:你可以使用它来传递数据给你的回调函数。回调函数的参数events则是事件标志的位掩码。
你若给某个回调函数入参传一个NULL指针,则禁用该回调函数。注意到所有的回调函数共享入参cbarg,修改该参数会影响所有回调函数。
启用/禁用
void bufferevent_enable(struct bufferevent *bufev, short events);
void bufferevent_disable(struct bufferevent *bufev, short events);
short bufferevent_get_enabled(struct bufferevent *bufev);
你可以在一个bufferevent上启用或禁用指定的事件:EV_READ, EV_WRITE, 或EV_READ|EV_WRITE。当读或写被禁用后,bufferevent将不会尝试读或写数据。
当输出缓存为空时没有必要禁用写操作:bufferevent会自动停止写,当有缓存里有数据时会自动重新开始写socket。
类似的,当输入缓存达到最高水位时也没必要禁用读操作,bufferevent会自动停止再读入数据,当输入缓存又有空间时bufferevent会自动重新开始读入数据到缓存。
默认上,一个新创建的bufferevent启用了写操作,但不包括读操作(读写buffer的操作)。
可以调用bufferevent_get_enabled()
来查看bufferevent当前启用了哪些事件。
设置水位
void bufferevent_setwatermark(struct bufferevent *bufev, short events,
size_t lowmark, size_t highmark);
函数bufferevent_setwatermark()
调整读、写水位,或都调整。 如果参数events设置了EV_READ,则调整读水位,如果设置EV_WRITE则调整写水位。
高水位如果设置为0,则表示不限制。
Example
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/util.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
struct info {
const char *name;
size_t total_drained;
};
void read_callback(struct bufferevent *bev, void *ctx)
{
struct info *inf = ctx;
struct evbuffer *input = bufferevent_get_input(bev);
size_t len = evbuffer_get_length(input);
if (len) {
inf->total_drained += len;
evbuffer_drain(input, len);
printf("Drained %lu bytes from %s\n",
(unsigned long) len, inf->name);
}
}
void event_callback(struct bufferevent *bev, short events, void *ctx)
{
struct info *inf = ctx;
struct evbuffer *input = bufferevent_get_input(bev);
int finished = 0;
if (events & BEV_EVENT_EOF) {
size_t len = evbuffer_get_length(input);
printf("Got a close from %s. We drained %lu bytes from it, "
"and have %lu left.\n", inf->name,
(unsigned long)inf->total_drained, (unsigned long)len);
finished = 1;
}
if (events & BEV_EVENT_ERROR) {
printf("Got an error from %s: %s\n",
inf->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
finished = 1;
}
if (finished) {
free(ctx);
bufferevent_free(bev);
}
}
struct bufferevent *setup_bufferevent(void)
{
struct bufferevent *b1 = NULL;
struct info *info1;
info1 = malloc(sizeof(struct info));
info1->name = "buffer 1";
info1->total_drained = 0;
/* ... Here we should set up the bufferevent and make sure it gets
connected... */
/* Trigger the read callback only whenever there is at least 128 bytes
of data in the buffer. */
bufferevent_setwatermark(b1, EV_READ, 128, 0);
bufferevent_setcb(b1, read_callback, NULL, event_callback, info1);
bufferevent_enable(b1, EV_READ); /* Start reading. */
return b1;
}
操作bufferevent里的数据
bufferevent提供一些函数让你可以向网络中读写数据:
Interface
struct evbuffer *bufferevent_get_input(struct bufferevent *bufev);
struct evbuffer *bufferevent_get_output(struct bufferevent *bufev);
这两个基础函数非常有用,分别返回输入缓存和输出缓存。evbuffer更多操作函数,见下节。
注意,程序可能只从输入缓存里读取数据(而不添加数据),只向输出缓存里添加数据(而不删除)。
如果输出缓存里太少数据而导致bufferevent的写操作被推迟,可以向输出缓存里添加数据,bufferevent就会自动重新开始把输出缓存里的数据写socket。对于输入缓存也类似。
Interface
int bufferevent_write(struct bufferevent *bufev,
const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
这2个函数添加数据到输出缓存里,调用bufferevent_write()
把内存里data位置的size字节添加到输出缓存的末尾。调用bufferevent_write_buffer()
则把buf的整块内存添加到输出缓存的末端,并把buf的数据删除。
成功返回0,遇到出错返回-1。
Interface
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev,
struct evbuffer *buf);
这2个函数从输入缓存里读取数据。调用bufferevent_read()从输入缓存里删除size字节,存到内存的data位置,返回被删除的字节数。bufferevent_read_buffer()则删除输入缓存里的所有数据,存放到buf里,成功返回0,失败返回-1。
注意使用bufferevent_read(),内存data位置一定要有足够的空间来容纳size字节的数据。
Example
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <ctype.h>
void
read_callback_uppercase(struct bufferevent *bev, void *ctx)
{
/* This callback removes the data from bev's input buffer 128
bytes at a time, uppercases it, and starts sending it
back.
(Watch out! In practice, you shouldn't use toupper to implement
a network protocol, unless you know for a fact that the current
locale is the one you want to be using.)
*/
char tmp[128];
size_t n;
int i;
while (1) {
n = bufferevent_read(bev, tmp, sizeof(tmp));
if (n <= 0)
break; /* No more data. */
for (i=0; i<n; ++i)
tmp[i] = toupper(tmp[i]);
bufferevent_write(bev, tmp, n);
}
}
struct proxy_info {
struct bufferevent *other_bev;
};
void
read_callback_proxy(struct bufferevent *bev, void *ctx)
{
/* You might use a function like this if you're implementing
a simple proxy: it will take data from one connection (on
bev), and write it to another, copying as little as
possible. */
struct proxy_info *inf = ctx;
bufferevent_read_buffer(bev,
bufferevent_get_output(inf->other_bev));
}
struct count {
unsigned long last_fib[2];
};
void
write_callback_fibonacci(struct bufferevent *bev, void *ctx)
{
/* Here's a callback that adds some Fibonacci numbers to the
output buffer of bev. It stops once we have added 1k of
data; once this data is drained, we'll add more. */
struct count *c = ctx;
struct evbuffer *tmp = evbuffer_new();
while (evbuffer_get_length(tmp) < 1024) {
unsigned long next = c->last_fib[0] + c->last_fib[1];
c->last_fib[0] = c->last_fib[1];
c->last_fib[1] = next;
evbuffer_add_printf(tmp, "%lu", next);
}
/* Now we add the whole contents of tmp to bev. */
bufferevent_write_buffer(bev, tmp);
/* We don't need tmp any longer. */
evbuffer_free(tmp);
}
读和写的超时
跟其他事件一样,bufferevent如果一定时间之后都没有读写数据,会执行超时回调函数。
Interface
void bufferevent_set_timeouts(struct bufferevent *bufev,
const struct timeval *timeout_read, const struct timeval *timeout_write);
传NULL给超时入参来删除对应的超时回调,然后在Libevent 2.1.2-alpha之前并不适用所有的事件类型。
注意,超时时间只有在bufferevent尝试读或写才开始计算,换句话说,如果bufferevent禁用读操作,或者输入缓存满了(达到最高水位),读操作超时时间不会启用。同样,写操作超时时间也并不会被启用如果禁用了些操作,或者没有数据可写。
如果读或写超时发生了,对应的读或写操作会被停止。事件回调函数会被执行,以标识BEV_EVENT_TIMEOUT|BEV_EVENT_READING
或BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING
。
刷新bufferevent
Interface
int bufferevent_flush(struct bufferevent *bufev,
short iotype, enum bufferevent_flush_mode state);
刷新bufferevent来告诉bufferevent忽略其他限制读或写操作的设置,强制从底层传输读或写尽可能多的数据。
参数iotype应为EV_READ, EV_WRITE, 或EV_READ|EV_WRITE,表示应该处理读、写操作,或两者。
参数state应为BEV_NORMAL, BEV_FLUSH, 或BEV_FINISHED。BEV_FINISHED表示应该告诉另一端数据发送完毕,BEV_NORMAL和BEV_FLUSH的差别跟bufferevent的类型有关。
函数bufferevent_flush()失败的话返回-1,如果没有数据可刷新,返回0,如果刷新了数据,返回1。
类型相关的bufferevent函数
这些函数并不支持所有类型的bufferevent。
Interface
int bufferevent_priority_set(struct bufferevent *bufev, int pri);
int bufferevent_get_priority(struct bufferevent *bufev);
调整bufev的优先级为pri。参见event_priority_set()
获取更多优先级的信息。
成功返回0,失败返回-1,只适用于基于socket的bufferevent。
Interface
int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);
evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);
设置或获取基于fd事件的文件描述符fd,只支持基于socket的bufferevent。失败返回-1,setfd()
成功则返回0。
Interface
struct event_base *bufferevent_get_base(struct bufferevent *bev);
返回bufferevent的event_base。
Interface
struct bufferevent *bufferevent_get_underlying(struct bufferevent *bufev);
当底层传输使用的是另一个bufferevent,通过这个函数来获取这个底层传输的bufferevent。
手工加锁或解锁bufferevent
跟evbuffer一样,有时候你想确保bufferevent上得一些操作是原子操作的。Libevent提供供你手工加锁和解锁的函数。
Interface
void bufferevent_lock(struct bufferevent *bufev);
void bufferevent_unlock(struct bufferevent *bufev);
注意,如果bufferevent在创建时如果没有提供BEV_OPT_THREADSAFE线程,或者启用Libevent的线程支持,加锁一个bufferevent是没有效果的。
加锁bufferevent也会锁上它关联的evbuffer。这些函数是可以重入的:对一个已获取的锁再次加锁是安全的,当然,也要由对应的解锁调用。
废弃的bufferevent功能
略。
完成于 2013年 7月 6日 星期六 22时42分03秒 CST
– EOF –