原文链接:

ev_io

ev_io用于句柄可读写的检测,如果你有如下需求,你就可能需要用到它。

  • 在网络对端发送的数据已到达本机时,及时读取socket中的数据
  • 管道中已经写满数据,在管道可写时及时写入数据


可用函数如下:

ev_io_init (ev_io *, callback, int fd, int events);
ev_io_set (ev_io *, int fd, int events);

void ev_io_start(EV_P_ ev_io *w) EV_THROW;
void ev_io_stop(EV_P_ ev_io *w) EV_THROW;


符号定义:

#if EV_MINPRI == EV_MAXPRI
# define EV_DECL_PRIORITY
#elif !defined (EV_DECL_PRIORITY)
# define EV_DECL_PRIORITY int priority;
#endif

#ifndef EV_COMMON
# define EV_COMMON void *data;
#endif

#ifndef EV_CB_DECLARE
# define EV_CB_DECLARE(type) void (*cb)(EV_P_ struct type *w, int revents);
#endif

#define EV_WATCHER(type)			\
  int active; /* private */			\
  int pending; /* private */			\
  EV_DECL_PRIORITY /* private */		\
  EV_COMMON /* rw */				\
  EV_CB_DECLARE (type) /* private */

typedef struct ev_watcher_list
{
  EV_WATCHER_LIST (ev_watcher_list)
} ev_watcher_list;

#define EV_WATCHER_LIST(type)			\
  EV_WATCHER (type)				\
  struct ev_watcher_list *next; /* private */

typedef struct ev_io
{
  EV_WATCHER_LIST (ev_io)
  int fd;     /* ro */
  int events; /* ro */
} ev_io;

上面符号定义有点多,gcc -E xxx.c编译出来的结果:

typedef struct ev_io
{
  int active; 
  int pending; 
  int priority;
  void *data; 
  void (*cb)(struct ev_loop *loop, struct ev_io *w, int revents); 
  struct ev_watcher_list *next;
  int fd;
  int events;
} ev_io;

sample1

这个例子可以在有标准输入时打印输入的内容。每次回调不需要重新设置监听的,默认就是一直监听着句柄的读事件。

void stdin_readable_cb(struct ev_loop *loop, ev_io *w, int revents)
{
    char buf[64] = {0};
    read(w->fd, buf, sizeof(buf) - 1);
    printf("%s\n", buf);
}
int main()
{
    struct ev_loop *loop = EV_DEFAULT;
    ev_io stdin_readable;

    ev_io_init(&stdin_readable, stdin_readable_cb, STDIN_FILENO, EV_READ);
    ev_io_start(loop, &stdin_readable);

    ev_run(loop, 0);
    return 0;
}

sample2

sample1还存在一些问题,就是如何在socket关闭时把这个事件停掉以免浪费资源,看下面这个例子。

如果对端socket已经关闭了,那么这个句柄就是处于可读的状态,libev会一直调用readable_cb直到你主动把这个io事件停掉,因为下面这个例子只有一个io事件,停掉了自然就会终止进程了。检测对端socket关闭是普通的tcp常识,检测一下recv的返回值即可。

Tips:拿家里路由器管理页面(192.168.1.1)来测试一下。

// 读事件回调
void readable_cb(struct ev_loop *loop, ev_io *w, int revents)
{
    char buf[64] = {0};
    ssize_t rlen = recv(w->fd, buf, sizeof(buf) - 1, 0);
    if (rlen == 0)
    {
        // 对端关闭,stop掉fd
        printf("\n=================== stop ==================\n");
        close(w->fd);
        ev_io_stop(loop, w);
    }
    else if (rlen < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
    {
        // 被打断,重新监听即可
        printf("\n=================== break ==================\n");
    }
    else if (rlen < 0)
    {
        // 出错
        printf("%s\n", strerror(errno));
    }
    else
    {
        // 打印正常接收到的数据
        printf("%s", buf);
    }
}

#define HTTP_HEADER "GET / HTTP/1.1\r\nHost: 192.168.1.1\r\nConnection:keep-alive\r\n\r\n"

int main()
{
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0)
    {
        printf("create sock failed\n");
        return 0;
    }

    struct sockaddr_in addr;
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr("192.168.1.1");
    addr.sin_port = htons(80);
	// 连接对端
    if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
    {
        printf("connect failed\n");
        return 0;
    }
	// 发送GET请求
    if (send(sock, HTTP_HEADER, sizeof(HTTP_HEADER), 0) != sizeof(HTTP_HEADER)) {
        printf("send failed\n");
        return 0;
    }
	// 设置为非阻塞
    int flags = fcntl(sock, F_GETFL);
    if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) != 0)
    {
        printf("%s\n", strerror(errno));
        return 0;
    }
	// 开始监听读事件
    struct ev_loop *loop = EV_DEFAULT;
    ev_io http_io;

    ev_io_init(&http_io, readable_cb, sock, EV_READ);
    ev_io_start(loop, &http_io);

    ev_run(loop, 0);
    return 0;
}

阻塞与非阻塞

句柄最好设置成非阻塞模式,除非你知道自己在做什么。官网提到了要小心可能会收到假的触发事件,比如给某个fd设置了EV_READ事件,在回调的函数里可能会发现并没有数据到达,具体的表现就是阻塞型fd使用read函数会阻塞,非阻塞型fd使用read函数会收到EAGAIN。这就是假的触发事件,这种情况应该当作啥事都没有发生过,继续监听事件到达。

select手册 中的Bugs章节提到了这样一个bug:

Under Linux, select() may report a socket file descriptor as "ready for reading", while nevertheless a subsequent read blocks. This could for example happen when data has arrived but upon examination has wrong checksum and is discarded. There may be other circumstances in which a file descriptor is spuriously reported as ready. Thus it may be safer to use O_NONBLOCK on sockets that should not block.

大意就是,由于各种原因,select可能会因为假的事件而返回,比如select已经返回,但是数据包的checksum不对而被丢弃,所以导致read读不到数据。但是bug为何至今不修复,应该是有其原因的,stackoverflow上有个同样的问题Spurious readiness notification for Select System call

如果你由于某种原因必须使用阻塞模式,你必须在读之前再次测试是否真的有数据可读,否则可能永远阻塞在那里了。有些人就会用SIGALRM和定时器来保证进程不会被永久阻塞。不过推荐最好还是用非阻塞模式。

复制的句柄

复制的句柄可能只有其中一个会收到触发的事件,比如先将标准出错句柄2关闭,再使用dup函数将标准输出1复制到句柄2,现在就有两个句柄(即21)表示标准输出了,如果为两个句柄都注册同样事件的话,那么只有其中一个句柄注册的事件会被触发。

文件的读写

文件句柄不同于socket,文件所需的磁盘IO时间消耗是躲不过去的。比如你打开了一个文件,设置了EV_READEV_WRITE事件,想以此来减少磁盘IO时间。可惜的是这样并没有效果,很可能libev会一直触发事件,但是你去read或者write依然要花时间在磁盘IO上面。所以socket、管道、字符设备等等这些句柄才能达到减少消耗的效果。

建议在普通文件上用异步的模式(如libeio)来减少IO消耗,除非你知道自己在做什么。比如STDIN、STDOUT还是可以考虑用io_watcher的。

fork带来的问题

epoll和kqueue是不支持fork的,而libev是支持的,只不过要在子进程里告诉它你已经执行fork了。想在支持fork,应该在执行fork之后在子进程里调用ev_loop_fork (),打开EVFLAG_FORKCHECK,或者resort to EVBACKEND_SELECT or EVBACKEND_POLL

accept带来的问题

比如承载力较大的server通常会将有限的句柄消耗完,这时accept()就会返回ENFILE,但是呢,又不会拒绝连接,导致libev每次循环都回触发可读事件,这样就会造成进程的CPU消耗过高。

最简单的方式就是忽略这个问题,当server过载的时候就会忙等待,直到负载降下来自然问题就解决了。

推荐在进程中将所有错误给LOG下来,除了EAGAINEWOULDBLOCK,其他都可以考虑LOG下来,这样在出现奇怪的问题的时候可能有点用处,但要保证别LOG过多的垃圾信息。可以的话,在必要的时候将ev_io给stop掉一小段时间,这样能减少一点CPU消耗。

如果是单线程的话,可以先打开一些虚句柄(如/dev/null)来占位,等到accept()返回ENFILEEMFILE的时候,关闭掉一个虚句柄,再accept(),实句柄用完之后关闭,再打开虚句柄。据说这可以在程序过载的时候优雅地拒绝即将到来的连接。

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <ev.h>

void setnoblock(int fd) 
{
    int flags = fcntl(fd, F_GETFL, 0); 
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

void accept_action(struct ev_loop *main_loop, ev_io *w, int e);
void read_action(struct ev_loop *main_loop, ev_io *w, int e);
void write_action(struct ev_loop *main_loop, ev_io *w, int e);

void accept_action(struct ev_loop *main_loop, ev_io *w, int e)
{
    printf("[accept action]: new connect:\n");
    struct sockaddr_in client_addr;
    socklen_t addr_len = sizeof(client_addr);
    int client_fd = accept(w->fd, (struct sockaddr *)&client_addr, &addr_len);
    setnoblock(client_fd);
    ev_io *read_w = (ev_io *)malloc(sizeof(ev_io));
    ev_io_init(read_w, read_action, client_fd, EV_READ);
    ev_io_start(main_loop, read_w);
    printf("\t->client fd: %d\n", client_fd);
}

void read_action(struct ev_loop *main_loop, ev_io *w, int e)
{
    printf("[read action]: client request:\n");
    char buff[1024];
    bzero(buff, sizeof(buff));
    ssize_t len = read(w->fd, buff, sizeof(buff));
    printf("\t->request: %s\n", buff);
    ev_io *write_w = (ev_io *)malloc(sizeof(ev_io));
    ev_io_init(write_w, write_action, w->fd, EV_WRITE);
    ev_io_start(main_loop, write_w);
}

void write_action(struct ev_loop *main_loop, ev_io *w, int e)
{
    printf("[write action]: response.\n");
    char buff[1024] = "OK";
    ssize_t len = write(w->fd, buff, strlen(buff));
    ev_io_stop(main_loop, w);
}

void timeout_action(struct ev_loop *main_loop, ev_timer *w, int e)
{
    puts("time out.");
    //ev_break(EV_A_ EVBREAK_ONE);
}

void signal_action(struct ev_loop *main_loop, ev_signal *w, int e)
{
    printf("[signal_action]: recv signal: %d\n", w->signum);
    ev_signal_stop(main_loop, w);
    ev_break(main_loop, EVBREAK_ONE);
}

int main()
{
    int res;
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd == -1) {
        perror("socket fail:");
        exit(-1);
    }

	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(7979);
	addr.sin_addr.s_addr = INADDR_ANY;
	res = bind(listen_fd, (struct sockaddr *)&addr, sizeof(addr));
    if (res == -1) {
        perror("bind fail:");
        exit(-1);
    }
    res = listen(listen_fd, 10);
    if (res == -1) {
        perror("listen fail:");
        exit(-1);
    }
    setnoblock(listen_fd);
    struct ev_loop *main_loop = ev_loop_new(EVBACKEND_EPOLL);
    ev_io listen_w;
    ev_io_init(&listen_w, accept_action, listen_fd, EV_READ);
    ev_io_start(main_loop, &listen_w);

    /*ev_signal signal_w;
    ev_init(&signal_w, signal_action);
    ev_signal_set(&signal_w, SIGINT);
    ev_timer timeout_w;
    ev_timer_init(&timeout_w, timeout_action, 4.9, 0);
    ev_timer_start(main_loop, &timeout_w);*/

    ev_run(main_loop, 0);
    return 0;
}
/*
 * @author  : 
 * @date    : 2014-09-04
 * @desc	: tiny socket server implemented by libev
 * 			  to use this, you should install libev at first.
 *
 * 			  server: just run the program
 * 			  client: telnet localhost 8080
 *
 * @refer	: 1). http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod
 * 			  2). http://blog.csdn.net/lengzijian/article/details/8315133
 *
 */
#include <ev.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CONNECTIONS 10

// static struct ev_loop *loop;

struct ev_io *libevlist[MAX_CONNECTIONS] = {NULL};

void ev_on_accept(struct ev_loop *loop, struct ev_io *watcher, int revents);
void ev_on_read(struct ev_loop *loop, struct ev_io *watcher, int revents);
typedef void on_data(void *in, int len);

typedef struct sock_event_callback
{

	int (*on_connect)(uint8_t state);
	int (*on_accept)(int fd);
	// int (*on_recv)(const uint8_t *, int);
	int (*on_data)(int fd, uint8_t *data, int len);
	int (*on_send)(uint8_t state, int);
	int (*on_close)(uint8_t state);
	int (*on_error)(uint8_t state);
	int (*send_keepalive_probe)(uint8_t);
	// uint8 			conn_id;
} sock_event_callback_t;

/*
 	Server 					Client
	socket					socket
	  |						  |
	  v						  v
	bind					connect
  	  |						  |
	  v						  v
	listen					write
  	  |						  |
	  v						  v
	accept					read
	  |						  |
	  v						  v
	read					close
  	  |
	  v
	write
  	  |
	  v
	close
*/


void *thr_fn(void *arg)
{
	struct ev_loop *loop = (struct ev_loop *)arg;
	ev_run(loop, 0);
	return ((void *)0);
}



struct ev_loop *server_create(char *host, int16_t port,sock_event_callback_t * cb)
{
	struct ev_loop *loop = ev_loop_new(0);

	/* socket start */
	int sd;
	struct sockaddr_in addr;
	int addr_len = sizeof(addr);
	struct ev_io *socket_watcher = (struct ev_io *)malloc(sizeof(struct ev_io));
	socket_watcher->data = cb;
	struct ev_timer *timeout_watcher = (struct ev_timer *)malloc(sizeof(struct ev_timer));

	// socket
	sd = socket(PF_INET, SOCK_STREAM, 0);
	if (sd < 0)
	{
		printf("socket error\n");
		goto ERROR;
	}

	bzero(&addr, sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);

	if (!host || strlen(host) == 0)
	{
		addr.sin_addr.s_addr = INADDR_ANY;
	}
	else
	{
		addr.sin_addr.s_addr = inet_addr(host);
	}

	// set sd reuseful
	int bReuseaddr = 1;
	if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (const char *)&bReuseaddr, sizeof(bReuseaddr)) != 0)
	{
		printf("setsockopt error in reuseaddr[%d]\n", sd);
		goto ERROR;
	}

	// bind
	if (bind(sd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
	{
		printf("bind error\n");
		goto ERROR;
	}

	// listen
	if (listen(sd, SOMAXCONN) < 0)
	{
		printf("listen error\n");
		goto ERROR;
	}


	/* socket end */
	ev_io_init(socket_watcher, ev_on_accept, sd, EV_READ);
	ev_io_start(loop, socket_watcher);

	/* start loop thread */
	pthread_t tid;
	int err = pthread_create(&tid, NULL, thr_fn, loop);
	if (err != 0)
	{
		printf("can't create thread: %s\n", strerror(err));
		goto ERROR;
	}
	return loop;
	ERROR:
	free(timeout_watcher);
	free(socket_watcher);
	return NULL;

}

void ev_on_accept(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
	printf("I am: %d\n", getpid());

	sock_event_callback_t * cb = (sock_event_callback_t * )watcher->data;
	struct sockaddr_in client_addr;
	socklen_t client_len = sizeof(client_addr);
	int client_sd;

	// ev_io watcher for client
	struct ev_io *client_watcher = (struct ev_io *)malloc(sizeof(struct ev_io));
	client_watcher->data =cb;
	if (client_watcher == NULL)
	{
		printf("malloc error in accept_cb\n");
		return;
	}

	if (EV_ERROR & revents)
	{		
		if(cb&&cb->on_error){
			cb->on_error(2);
		}
		printf("error event in accept\n");
		return;
	}

	// socket accept: get file description
	client_sd = accept(watcher->fd, (struct sockaddr *)&client_addr, &client_len);
	if (client_sd < 0)
	{
		printf("accept error\n");
		return;
	}
	// too much connections
	if (client_sd > MAX_CONNECTIONS)
	{
		if(cb&&cb->on_error){
			cb->on_error(2);
		}
		printf("fd too large[%d]\n", client_sd);
		close(client_sd);
		return;
	}

	if (libevlist[client_sd] != NULL)
	{
		// too much client
		if(cb&&cb->on_error){
			cb->on_error(3);
		}
		printf("client_sd not NULL fd is [%d]\n", client_sd);
		return;
	}

	printf("client connected %d\n", client_sd);
		if(cb&&cb->on_accept){
			cb->on_accept(client_sd);
		}
	// listen new client
	ev_io_init(client_watcher, ev_on_read, client_sd, EV_READ);
	ev_io_start(loop, client_watcher);

	libevlist[client_sd] = client_watcher;
}

void ev_on_read(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
	char buffer[BUFFER_SIZE];
	ssize_t read;

	sock_event_callback_t * cb = (sock_event_callback_t * )watcher->data;

	if (EV_ERROR & revents)
	{
		printf("error event in read\n");
		return;
	}
	// socket recv
	read = recv(watcher->fd, buffer, BUFFER_SIZE, 0); // read stream to buffer
	if (read < 0)
	{
		printf("read error\n");
		return;
	}

	if (read == 0)
	{
		printf("client disconnected.\n");

		if (libevlist[watcher->fd] == NULL)
		{
			// socket already used
			if(cb && cb->on_close){
				cb->on_close(6);
			}
			printf("the fd already freed[%d]\n", watcher->fd);
		}
		else
		{
			// close socket
			if(cb && cb->on_close){
				cb->on_close(6);
			}
			close(watcher->fd);
			ev_io_stop(loop, libevlist[watcher->fd]);
			free(libevlist[watcher->fd]);
			libevlist[watcher->fd] = NULL;
		}
		return;
	}
	else
	{
		printf("receive message:%d %d\n",(int)read, watcher->fd);
	
		if(cb && cb->on_data){
				cb->on_data(watcher->fd,(uint8_t *)buffer,read);
		}
	}

	// socket send to client
	send(watcher->fd, buffer, read, 0);
	bzero(buffer, read);
}

	int _on_data(int fd, uint8_t *data, int len){
		for (int i = 0; i < len; i++)
		{
			printf("%02x ",data[i]);
		}
		printf("\n");

			send(fd, data, len, 0);

		return 0;
		
	}


int main(int argc, char const *argv[])
{
	sock_event_callback_t *cb =calloc(1,sizeof(sock_event_callback_t));
	cb->on_data=_on_data;
	server_create("0.0.0.0", 3000,cb);
	server_create("0.0.0.0", 5000,cb);
	while (1)
	{
		sleep(1);
	}
	return 0;
}

`