libev


原文链接: libev

libev教程一:libev简单入门_把握自己。-CSDN博客

libev是个非常优秀的基于事件的循环库,很多开源软件,比如nodejs就是使用其实现基础功能。本系列将对该库进行源码分析。(转载请指明出于breaksoftware的csdn博客)

不知道是被墙了还是网站不再维护,它的官网(http://libev.schmorp.de/)在国内已经没法访问了。
但是我们仍然可以从github上下载其源码(https://github.com/enki/libev)。

交叉编译

./configure --host=arm-none-linux-gnueabi --prefix=pwd/libev
./configure --host=arm-mac-linux-gnueabihf CC=arm-mac-linux-gnueabihf-gcc --prefix=pwd/libev

使用样例

libev支持相对时间定时器、绝对时间定时器、文件状态监控和信号监控等功能。我们可以在它基础上,通过少量的代码实现稳健完善的功能。

我们先看一段实现定时器功能的代码

#include <ev.h>
#include <stdio.h>

ev_timer timeout_watcher;

static void
timeout_cb(EV_P_ ev_timer *w, int revents)
{
    puts("timeout");
    ev_break(EV_A_ EVBREAK_ONE);
}

int main(void)
{
    struct ev_loop *loop = EV_DEFAULT;
    ev_timer_init(&timeout_watcher, timeout_cb, 5.5, 0);
    ev_timer_start(loop, &timeout_watcher);
    ev_run(loop, 0);
    return 0;
}
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <sys/unistd.h>
#include <ev.h>


void io_action(struct ev_loop *main_loop,ev_io *io_w,int e)
{
    int rst;
    char buf[1024];
    memset(buf,0,sizeof(buf));
    puts("In IO action");
    read(STDIN_FILENO,buf,sizeof(buf));
    buf[1023]='\0';
    printf("String: %s\n",buf);
    ev_io_stop(main_loop,io_w);
}

void timer_action(struct ev_loop *main_loop,ev_timer *time_w,int e)
{
    puts("In Time action");
    ev_timer_stop(main_loop,time_w);
}

void signal_action(struct ev_loop *main_loop,ev_signal *signal_w,int e)
{
    puts("In Signal action");
    ev_signal_stop(main_loop,signal_w);
    ev_break(main_loop,EVBREAK_ALL);
}

int main(int argc,char **argv)
{
    ev_io io_w;
    ev_timer timer_w;
    ev_signal signal_w;
    struct ev_loop *main_loop = ev_default_loop(0);

    ev_init(&io_w,io_action);
    ev_io_set(&io_w,STDIN_FILENO,EV_READ);

    ev_init(&timer_w,timer_action);
    ev_timer_set(&timer_w,2,0);

    ev_init(&signal_w,signal_action);
    ev_signal_set(&signal_w,SIGINT);

    ev_io_start(main_loop,&io_w);
    ev_timer_start(main_loop,&timer_w);
    ev_signal_start(main_loop,&signal_w);

    ev_run(main_loop,0);
    return 0;
}

这段代码的结构非常简单。首先我们要定义一个名为timeout_cb的回调函数用于响应定时器。然后定义一个ev_timer结构(监视器),它通过ev_timer_init进行初始化。初始化的参数包含之前定义的响应函数指针和迭代器超时时间。ev_timer准备好后,通过ev_timer_start将其和一个ev_loop进行绑定。最后使用ev_run方法运行起来这个ev_loop指针,从而实现一个完整的定时器功能。

可见使用libev库非常方便。其实我们之后见到的其他用法和上面步骤是类似的,即:

  • 初始化ev_loop。
  • 定义监视器。
  • 定义回调函数。
  • 监视器和回调函数关联。
  • 监视器和ev_loop关联。
  • ev_run将ev_loop运行起来。

假如上面代码是个框架使用的雏形,那么如果让我们去设计这样的框架,该如何设计呢?

模型设计

首先我们需要考虑到的是使用sleep还是使用事件模型去实现逻辑等待。

如果使用sleep方法,那么我们就要在唤醒后去检测各个事件,比如要检测文件状态是否发生变化,比如定时器时间是否已经超时。于是有个问题,就是sleep多久怎么确定?我们不知道是5秒后还是1秒后文件状态发生变化,那么只能最小粒度sleep了。那么这就意味着线程在短暂挂起后,马上检测一系列可能尚未发生改变的事件。这种设计明显很消耗CPU,而且非常低效。

如果使用事件模型去等待,就可以解决上述问题。但是像定时器,在系统中并没有事件与其对应。于是我们需要考虑下对于没有事件对应的功能怎么通过事件模型去封装。

其次我们需要考虑使用单线程模型还是多线程模型。

单线程模型是让主流程和事件响应函数在一个线程中执行。其伪代码是

If (event is ready) {
	event_callback(); // in the main thead
}

其特点是实现简单,但是事件响应函数的效率将严重影响主流程对事件的响应速度。比如A、B两个事件同时发生,理论上我们希望两个事件的响应函数被同时执行,或者在允许存在的系统调用时间差(比如创建线程的消耗)内执行。然而单线程模型则会让一个响应函数执行完后再去执行另一响应函数,于是就出现排队现象。所以单线程模型无法保证及时响应。

多线程模型则完全避免了上述问题。它可在事件发生后启动一个线程去处理响应函数。当然相对来说多线程模型比较复杂,需要考虑多线程同步问题。

If (event is ready) {
	thread_excute(event_callback); // run in another thread
}

那么libev对上面两个问题是怎么选择的呢?对于sleep和事件模型,libev选择的是后者,所以它是“高性能”的。对于单线程和多线程,libev选择的是前者。至于原因我并不知道,可能是作者希望它足够简单,或者希望它能在不支持多线程的系统上使用。但是要说明一点,并不是说libev不支持多线程。因为一个单线程模型的执行体,是可以放在其他若干个线程中执行的,只要保证数据同步。

单/多线程编译

libev提供了各种编译选项以支持各种特性。比如在支持多线程的系统上,我们可以指定EV_MULTIPLICITY参数,以让libev编译出多线程版本。

libev对于单线程版本的数据都是以全局静态变量形式提供。而对于多线程版本,则提供了一个结构体——ev_loop保存数据,这样不同线程持有各自的数据对象,从而做到数据隔离。

#if EV_MULTIPLICITY

  struct ev_loop
  {
    ev_tstamp ev_rt_now;
    #define ev_rt_now ((loop)->ev_rt_now)
    #define VAR(name,decl) decl;
      #include "ev_vars.h"
    #undef VAR
  };
  #include "ev_wrap.h"

  static struct ev_loop default_loop_struct;
  EV_API_DECL struct ev_loop *ev_default_loop_ptr = 0; /* needs to be initialised to make it a definition despite extern */

#else

  EV_API_DECL ev_tstamp ev_rt_now = 0; /* needs to be initialised to make it a definition despite extern */
  #define VAR(name,decl) static decl;
    #include "ev_vars.h"
  #undef VAR

  static int ev_default_loop_ptr;

#endif

不管是哪个版本,它们都提供了ev_default_loop_ptr变量。多线程版本它将指向全局静态变量default_loop_struct,这样对于使用了多线程版本又不想维护ev_loop结构对象的用户来说,直接使用这个对象就行了,非常方便。

然后再看下ev_vars.h的引入。其实现如下:

#define VARx(type,name)VAR(name, type name)

VARx(ev_tstamp, now_floor) /* last time we refreshed rt_time */
VARx(ev_tstamp, mn_now)    /* monotonic clock "now" */
VARx(ev_tstamp, rtmn_diff) /* difference realtime - monotonic time */

/* for reverse feeding of events */
VARx(W *, rfeeds)
VARx(int, rfeedmax)
VARx(int, rfeedcnt)

VAR (pendings, ANPENDING *pendings [NUMPRI])
VAR (pendingmax, int pendingmax [NUMPRI])
VAR (pendingcnt, int pendingcnt [NUMPRI])

在多线程版本中,它在ev_loop结构体中被引入的。这样在编译器展开文件时,它将会被定义到结构体内部。在单线程版本中,VAR宏被声明为定义一个静态全局变量的形式。这种利用宏和编译展开技术,在不同结构中定义相同类型数据的方式还是很有意思的。

但是又会有个问题,如何去访问这些变量呢?在单线程中,它们是静态变量,所有位置可以直接通过名称访问。而多线程版本中,则需要通过一个ev_loop结构体去引导。相关的代码总不能通过EV_MULTIPLICITY宏来区分不同变量形式吧?如果那样,代码将变得非常难看。我们看下libev怎么巧妙解决这个问题的。

上面代码块的多线程定义区间,引入了ev_wrap.h文件。其实现如下:

#ifndef EV_WRAP_H
#define EV_WRAP_H
#define acquire_cb ((loop)->acquire_cb)
#define activecnt ((loop)->activecnt)
#define anfdmax ((loop)->anfdmax)
#define anfds ((loop)->anfds)
#define async_pending ((loop)->async_pending)
#define asynccnt ((loop)->asynccnt)

这样使用一个和变量相同名称的宏替代了通过ev_loop结构体对象访问的变量。且这个宏名称和单线程版本中静态变量名相同。这样就让不同版本的关键变量“同名”了。于是代码对这些变量的访问直接使用其原始名称即可——单线程中使用的是真实变量名,多线程中使用的是宏。
这样的设计,又引入一个问题。那就是所有使用这些变量的函数,在多线程版本中,需要提供一个名字为loop的ev_loop结构体对象;而在单线程版本中则不需要。为了固化这个名称,libev还为此专门定义了一系列宏。

#if EV_MULTIPLICITY
struct ev_loop;
# define EV_P  struct ev_loop *loop               /* a loop as sole parameter in a declaration */
# define EV_P_ EV_P,                              /* a loop as first of multiple parameters */
# define EV_A  loop                               /* a loop as sole argument to a function call */
# define EV_A_ EV_A,                              /* a loop as first of multiple arguments */
# define EV_DEFAULT_UC  ev_default_loop_uc_ ()    /* the default loop, if initialised, as sole arg */
# define EV_DEFAULT_UC_ EV_DEFAULT_UC,            /* the default loop as first of multiple arguments */
# define EV_DEFAULT  ev_default_loop (0)          /* the default loop as sole arg */
# define EV_DEFAULT_ EV_DEFAULT,                  /* the default loop as first of multiple arguments */
#else
# define EV_P void
# define EV_P_
# define EV_A
# define EV_A_
# define EV_DEFAULT
# define EV_DEFAULT_
# define EV_DEFAULT_UC
# define EV_DEFAULT_UC_
# undef EV_EMBED_ENABLE
#endif

之后我们在代码中导出可见的EV_P和EV_A就是为了保证不同版本的实现在代码层面是相同的。

libev

1.1 Introduction

主页http://software.schmorp.de/pkg/libev.html.

文档http://software.schmorp.de/pkg/libev.html.

libev所实现的功能就是一个强大的reactor,可能notify事件主要包括下面这些:

  • ev_io // IO可读可写
  • ev_stat // 文件属性变化
  • ev_async // 激活线程
  • ev_signal // 信号处理
  • ev_timer // 定时器
  • ev_periodic // 周期任务
  • ev_child // 子进程状态变化
  • ev_fork // 开辟进程
  • ev_cleanup // event loop退出触发事件
  • ev_idle // 每次event loop空闲触发事件
  • ev_embed // TODO(zhangyan04):I have no idea.
  • ev_prepare // 每次event loop之前事件
  • ev_check // 每次event loop之后事件

1.2 About The Code

代码风格相当严谨而且排版也非常工整,并且从域名看出作者是德国人。但是内部使用了大量的宏造成阅读代码并不是非常方便。

并且从代码角度分析,应该是一开始支持有一个默认的event_loop,但是随着多核产生实际应用中可能会使用到多个event_loop, 猜想作者应该是为了方便的话使用了很多宏进行替换。

允许使用多个event_loop的宏是EV_MULTIPLICITY.

比如下面这段代码

void noinline  
ev_io_start (EV_P_ ev_io *w)  
{

  int fd = w->fd;  

  if (expect_false (ev_is_active (w)))  
    return;  

  assert (("libev: ev_io_start called with negative fd", fd >= 0));  
  assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));  

  EV_FREQUENT_CHECK;  

  ev_start (EV_A_ (W)w, 1);  
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);  
  wlist_add (&anfds[fd].head, (WL)w);  

  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);  
  w->events &= ~EV__IOFDSET;  

  EV_FREQUENT_CHECK;  
}  

初次阅读这个代码会觉得非常难懂。

说明定义
EV_Pevent parameterstruct ev_loop* loop
EVPEV_P,
EV_Aevent argumentloop
EVAEV_A,

然后很多变量只要是ev_loop成员的话都被封装成为了宏。比如代码里面的anfds,实际上的宏定义是

#define anfds           ((loop)->anfds) 

事实上一个ev_loop里面的字段是相当多的,不过也很正常本身就是一个强大的reactor.但是这造成一个直接后果,就是对于想要了解ev_loop的全貌比较困难,所以想要彻底地了解libev也比较麻烦,所以我们只能够从应用层面来尝试了解它。

1.3 EventLoop

首先我们关注一下reactor本身。在libev下面reactor对象称为event_loop.

event_loop允许动态创建和销毁,并且允许绑定自定义数据

struct ev_loop * ev_loop_new (unsigned int flags);  
void ev_loop_destroy (EV_P);  
void ev_set_userdata (EV_P_ void *data);  
void *ev_userdata (EV_P);

我们这里主要关注一下flags.这里面主要是选择使用什么backend来进行poll操作,可以选择的有:

  • EVBACKEND_SELECT
  • EVBACKEND_POLL
  • EVBACKEND_EPOLL // 通常我们选择这个
  • EVBACKEND_KQUEUE
  • EVBACKEND_DEVPOLL
  • EVBACKEND_PORT

但是还有三个比较重要选项:

  • EVFLAG_NOINOTIFY // 不适用inofity调用来使用ev_stat.这样可以减少fd使用。
  • EVFLAG_SIGNALFD // 使用signalfd来检测信号是否发生,同样这样可以减少fd使用。

大部分时候我们使用EVFLAG_AUTO(0)一般就足够满足需求了,从代码角度来看如果支持epoll的话那么首先会选择epoll. 因为在watcher的回调函数里面是可以知道当前event_loop的,这样就可以获得自定义数据。然后我们看看这个event_loop如何运行和停止的

void ev_run (EV_P_ int flags);  
void ev_break (EV_P_ int how);

同样我们这里比较关注flags和how这两个参数。flags有下面这几个:

  • 0.通常这是我们想要的,每次轮询在poll都会等待一段时间然后处理pending事件。
  • EVRUN_NOWAIT.运行一次,在poll时候不会等待。这样效果相当于只是处理pending事件。
  • EVRUN_ONCE.运行一次,但是在poll时候会等待,然后处理pending事件。

而how有下面这几个:

  • EVBREAK_ONE.只是退出一次ev_run这个调用。通常来说使用这个就可以了。
  • EVBREAK_ALL.退出所有的ev_run调用。这种情况存在于ev_run在pengding处理时候会递归调用。

在backend/epoll底层每次epoll_wait时候,libev提供了接口回调可以在epoll_wait前后调用

void ev_set_loop_release_cb (loop, void (*release)(EV_P), void (*acquire)(EV_P))  
static void  
epoll_poll (EV_P_ ev_tstamp timeout)  
{

  /* epoll wait times cannot be larger than (LONG_MAX - 999UL) / HZ msecs, which is below */  
  /* the default libev max wait time, however. */  
  EV_RELEASE_CB;  
  eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax,  
                         epoll_epermcnt ? 0 : ev_timeout_to_ms (timeout));  
  EV_ACQUIRE_CB;  
}  

在event_loop里面我们还关心一件事情,就是每次event_loop轮询的时间长短。通常来说这个不会是太大问题,但是在高性能情况下面我们需要设置

void ev_set_io_collect_interval (EV_P_ ev_tstamp interval);  
void ev_set_timeout_collect_interval (EV_P_ ev_tstamp interval);  

在ev_run里面有使用这些参数的代码比较麻烦。但是大意是这样,如果我们这是了timeout_interval的话,那么我们每次检查timeout时间的话必须在timeout_interval,使用这段时间ev_sleep.但是这个又会影响到io_interval,所以内部做了一些换算,换算的结果作为epoll_wait超时时间。不过同样在大部分时候我们不需要关心它,默认时候是0.0,系统会使用最快的响应方式来处理。

1.4 Watcher

然后我们关心一下EventHandler.在libev下面watcher相当于EventHandler这么一个概念,通常里面会绑定fd回调函数以及我们需要关注的事件。然后一旦触发事件之后会触发我们使用的回调函数,回调函数参数通常有reactor,watcher以及触发的事件。这里不打算重复文档里面的watcher 相关的内容和对应的API,但是对于某些内容的话可能会提到并且附带一些注释。之前我们还是看看通用过程,这里使用TYPE区分不同类型watcher.

typedef void (*)(struct ev_loop *loop, ev_TYPE *watcher, int revents) callback; // callback都是这种类型  
ev_init (ev_TYPE *watcher, callback);                         // 初始化watcher  
ev_TYPE_set (ev_TYPE *watcher, [args]);                       // 设置watcher  
ev_TYPE_init (ev_TYPE *watcher, callback, [args]);            // 通常使用这个函数最方便,初始化和设置都在这里  
ev_TYPE_start (loop, ev_TYPE *watcher);                       // 注册watcher  
ev_TYPE_stop (loop, ev_TYPE *watcher);                        // 注销watcher  
ev_set_priority (ev_TYPE *watcher, int priority);             // 设置优先级  
ev_feed_event (loop, ev_TYPE *watcher, int revents);          // 这个做跨线程通知非常有用,相当于触发了某个事件。  

bool ev_is_active (ev_TYPE *watcher);                         // watcher是否active.  
bool ev_is_pending (ev_TYPE *watcher);                        // watcher是否pending.  
int ev_clear_pending (loop, ev_TYPE *watcher);                // 清除watcher pending状态并且返回事件  

wacther的状态有下面这么几种:

  • initialiased. 调用init函数初始化
  • active. 调用start进行注册
  • pending. 已经触发事件但是没有处理
  • inactive. 调用stop注销。这个状态等同于initialised这个状态。

其实关于每个watcher具体是怎么实现的没有太多意思,因为大部分现有代码都差不多。会在下一节说说内部数据结构是怎么安排的,了解内部数据结构以及过程之后很多问题就可以避免了,比如"The special problem of disappearing file descriptors"这类问题。

1.5 How it works

1.5.1 ev_run

最主要的还是看看ev_run这个部分代码。我们不打算仔细阅读只是看看梗概然后大体分析一下数据结构应该怎么样的

void  
ev_run (EV_P_ int flags)  
{

  assert (("libev: ev_loop recursion during release detected", loop_done != EVBREAK_RECURSE));  

  loop_done = EVBREAK_CANCEL;  

  EV_INVOKE_PENDING; /* in case we recurse, ensure ordering stays nice and clean */   

  do  
    {

      if (expect_false (loop_done))  
        break;  

      /* update fd-related kernel structures */  
      fd_reify (EV_A);  

      /* calculate blocking time */  
      {

        ev_tstamp waittime  = 0.;  
        ev_tstamp sleeptime = 0.;  

        /* remember old timestamp for io_blocktime calculation */  
        ev_tstamp prev_mn_now = mn_now;  

        /* update time to cancel out callback processing overhead */  
        time_update (EV_A_ 1e100);  

        if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt)))  
          {

            waittime = MAX_BLOCKTIME;  

            if (timercnt)  
              {

                ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now + backend_fudge;  
                if (waittime > to) waittime = to;  
              }  

            /* don't let timeouts decrease the waittime below timeout_blocktime */  
            if (expect_false (waittime < timeout_blocktime))  
              waittime = timeout_blocktime;  

            /* extra check because io_blocktime is commonly 0 */  
            if (expect_false (io_blocktime))  
              {

                sleeptime = io_blocktime - (mn_now - prev_mn_now);  

                if (sleeptime > waittime - backend_fudge)  
                  sleeptime = waittime - backend_fudge;  

                if (expect_true (sleeptime > 0.))  
                  {

                    ev_sleep (sleeptime);  
                    waittime -= sleeptime;  
                  }  
              }  
          }  

        assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */  
        backend_poll (EV_A_ waittime);  
        assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */  

        /* update ev_rt_now, do magic */  
        time_update (EV_A_ waittime + sleeptime);  
      }  

      /* queue pending timers and reschedule them */  
      timers_reify (EV_A); /* relative timers called last */  

      EV_INVOKE_PENDING;  
    }  
  while (expect_true (  
    activecnt  
    && !loop_done  
    && !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))  
  ));  

  if (loop_done == EVBREAK_ONE)  
    loop_done = EVBREAK_CANCEL;  
}

我们可以总结一下大致步骤,其实和大部分的event loop写出来差不多。

  • 首先触发那些已经pending的watchers.
  • 判断是否loop_done
  • fd_reify.这个后面会单独说。
  • 计算出waittime并且进行必要的sleep.
  • backend_poll开始轮询,并且整理好pending事件
  • timers_reify.这个和fd_reify不同
  • 调用EV_INVOKE_PENDING来触发pending的io事件

非常简单。接下来我们看看fd_reify,backend_poll,timers_reify以及EV_INVOKE_PENDING.

1.5.2 fd_reify

下面是fd_reify代码片段.可以看出,这个部分就是在修改fd关注的events。

inline_size void  
fd_reify (EV_P)  
{

  int i;  
  for (i = 0; i < fdchangecnt; ++i)  
    {

      int fd = fdchanges [i];  
      ANFD *anfd = anfds + fd;  
      ev_io *w;  

      unsigned char o_events = anfd->events;  
      unsigned char o_reify  = anfd->reify;  

      anfd->reify  = 0;  

      /*if (expect_true (o_reify & EV_ANFD_REIFY)) probably a deoptimisation */  
        {

          anfd->events = 0;  

          for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)  
            anfd->events |= (unsigned char)w->events;  

          if (o_events != anfd->events)  
            o_reify = EV__IOFDSET; /* actually |= */  
        }  

      if (o_reify & EV__IOFDSET)  
        backend_modify (EV_A_ fd, o_events, anfd->events);  
    }  

  fdchangecnt = 0;  
}

而这个fdchanges这个是在哪里调用的呢。我们可以看到就是在ev_io_start这个部分。也就是说如果我们想要修改 fd关注事件的话,我们必须显示地ev_io_stop掉然后修正之后重新ev_io_start.底层调用fd_change的话底层维护数组fdchanges来保存发生events变动的fd.

void noinline  
ev_io_start (EV_P_ ev_io *w)  
{

  int fd = w->fd;  

  if (expect_false (ev_is_active (w)))  
    return;  

  assert (("libev: ev_io_start called with negative fd", fd >= 0));  
  assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));  

  EV_FREQUENT_CHECK;  

  ev_start (EV_A_ (W)w, 1);  
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);  
  wlist_add (&anfds[fd].head, (WL)w);  

  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);  
  w->events &= ~EV__IOFDSET;  

  EV_FREQUENT_CHECK;  
}  

inline_size void  
fd_change (EV_P_ int fd, int flags)  
{

  unsigned char reify = anfds [fd].reify;  
  anfds [fd].reify |= flags;  

  if (expect_true (!reify))  
    {

      ++fdchangecnt;  
      array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);  
      fdchanges [fdchangecnt - 1] = fd;  
    }  
}  

1.5.3 backend_poll

backend_poll底层支持很多poll实现,我们这里仅仅看ev_epoll.c就可以.代码在这里面我们不列举了,如果某个fd触发事件的话那么最终会调用fd_event(EVA,fd,event)来进行通知。所以我们看看fd_event.

inline_speed void  
fd_event_nocheck (EV_P_ int fd, int revents)  
{

  ANFD *anfd = anfds + fd;  
  ev_io *w;  

  for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)  
    {

      int ev = w->events & revents;  

      if (ev)  
        ev_feed_event (EV_A_ (W)w, ev);  
    }  
}  
void noinline  
ev_feed_event (EV_P_ void *w, int revents)  
{

  W w_ = (W)w;  
  int pri = ABSPRI (w_);  

  if (expect_false (w_->pending))  
    pendings [pri][w_->pending - 1].events |= revents;  
  else  
    {

      w_->pending = ++pendingcnt [pri];  
      array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);  
      // set the watcher and revents.  
      pendings [pri][w_->pending - 1].w      = w_;  
      pendings [pri][w_->pending - 1].events = revents;  
    }  
}  

可以看到底层是一个ANFD的数组,根据fd进行偏移。如果fd过大的话似乎会影响性能没有hpserver里面的demuxtable实现方式好。然后得到这个fd下面所有的watcher,然后在loop->pendings里面记录所有这些触发的watcher.

1.5.4 timers_reify

其中HEAP0就是最小堆下标。如果repeat的话说明需要重复发生,那么就会重新调整时间戳,如果不是repeat的话,那么内部会调用ev_timer_stop这个方法将这个计时器移除。所有的定时任务都通过feed_reverse添加。feed_reverse 内部是维护一个动态数组来保存所有的定时器任务,然后在feed_reverse_done里面遍历这些任务来触发这些定时器任务。

inline_size void  
timers_reify (EV_P)  
{

  EV_FREQUENT_CHECK;  

  if (timercnt && ANHE_at (timers [HEAP0]) < mn_now)  
    {

      do  
        {

          ev_timer *w = (ev_timer *)ANHE_w (timers [HEAP0]);  

          /*assert (("libev: inactive timer on timer heap detected", ev_is_active (w)));*/  

          /* first reschedule or stop timer */  
          if (w->repeat)  
            {

              ev_at (w) += w->repeat;  
              if (ev_at (w) < mn_now)  
                ev_at (w) = mn_now;  

              assert (("libev: negative ev_timer repeat value found while processing timers", w->repeat > 0.));  

              ANHE_at_cache (timers [HEAP0]);  
              downheap (timers, timercnt, HEAP0);  
            }  
          else  
            ev_timer_stop (EV_A_ w); /* nonrepeating: stop timer */  

          EV_FREQUENT_CHECK;  
          feed_reverse (EV_A_ (W)w);  
        }  
      while (timercnt && ANHE_at (timers [HEAP0]) < mn_now);  

      feed_reverse_done (EV_A_ EV_TIMER);  
    }  
}

1.5.5 EV_INVOKE_PENDING

这个宏最终调用的函数就是下面这个,遍历所有的pendings事件并且逐一触发。

void noinline

```c++
ev_invoke_pending (EV_P)  
{

  int pri;  

  for (pri = NUMPRI; pri--; )  
    while (pendingcnt [pri])  
      {

        ANPENDING *p = pendings [pri] + --pendingcnt [pri];  

        p->w->pending = 0;  
        EV_CB_INVOKE (p->w, p->events);  
        EV_FREQUENT_CHECK;  
      }  
}  

1.6 Example

尝试编写一个简单的带有超时的echo-server和echo-client就发现其实还有非常多的其他的工作量,比如buffer的管理状态机实现等。所以我没有写出一个完整的example,只是简单地写了假设echo-client连接上server的话就简单地打印链接信息并且关闭。

1.6.1 common.h

\#ifndef _COMMON_H_  
\#define _COMMON_H_  

\#include <unistd.h>  
\#include <fcntl.h>  
\#include <sys/types.h>  
\#include <sys/socket.h>  
\#include <arpa/inet.h>  

\#include <strings.h>  
\#include <cstdlib>  
\#include <cstdio>  
\#include <cstddef>  
\#include <string>  

namespace common{


\#define D(exp,fmt,...) do {
                   \  
        if(!(exp)){
                           \  
            fprintf(stderr,fmt,\##__VA_ARGS__);  \  
            abort();                            \  
        }                                       \  
    }while(0)  

static void setnonblock(int fd){

    fcntl(fd,F_SETFL,fcntl(fd,F_GETFL) | O_NONBLOCK);  
}  
static void setreuseaddr(int fd){

    int ok=1;  
    setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&ok,sizeof(ok));  
}  

static void setaddress(const char* ip,int port,struct sockaddr_in* addr){

    bzero(addr,sizeof(*addr));  
    addr->sin_family=AF_INET;  
    inet_pton(AF_INET,ip,&(addr->sin_addr));  
    addr->sin_port=htons(port);  
}  

static std::string address_to_string(struct sockaddr_in* addr){

    char ip[128];  
    inet_ntop(AF_INET,&(addr->sin_addr),ip,sizeof(ip));  
    char port[32];  
    snprintf(port,sizeof(port),"%d",ntohs(addr->sin_port));  
    std::string r;  
    r=r+"("+ip+":"+port+")";      
    return r;  
}  

static int new_tcp_server(int port){

    int fd=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
    D(fd>0,"socket failed(%m)\n");  
    setnonblock(fd);  
    setreuseaddr(fd);  
    sockaddr_in addr;  
    setaddress("0.0.0.0",port,&addr);  
    bind(fd,(struct sockaddr*)&addr,sizeof(addr));  
    listen(fd,64); // backlog = 64  
    return fd;  
}  

static int new_tcp_client(const char* ip,int port){

    int fd=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);  
    setnonblock(fd);  
    sockaddr_in addr;  
    setaddress(ip,port,&addr);  
    connect(fd,(struct sockaddr*)(&addr),sizeof(addr));  
    return fd;  
}  

}; // namespace common  

\#endif // _COMMON_H_  

1.6.2 echo-client.cc

\#include "ev.h"  
\#include "common.h"  

static void do_connected(struct ev_loop* reactor,ev_io* w,int events){

    close(w->fd);  
    ev_break(reactor,EVBREAK_ALL);  
}  

int main(){

    struct ev_loop* reactor=ev_loop_new(EVFLAG_AUTO);  
    int fd=common::new_tcp_client("127.0.0.1",34567);  
    ev_io io;  
    ev_io_init(&io,&do_connected,fd,EV_WRITE);  
    ev_io_start(reactor,&io);  
    ev_run(reactor,0);  
    close(fd);  
    ev_loop_destroy(reactor);  
    return 0;  
}  

1.6.3 echo-server.cc

\#include "ev.h"  

\#include "common.h"  

static void do_accept(struct ev_loop* reactor,ev_io* w,int events){

    struct sockaddr_in addr;  
    socklen_t addr_size=sizeof(addr);  
    int conn=accept(w->fd,(struct sockaddr*)&addr,&addr_size);  
    std::string r=common::address_to_string(&addr);  
    fprintf(stderr,"accept %s\n",r.c_str());  
    close(conn);  
}  

int main(){

    struct ev_loop* reactor=ev_loop_new(EVFLAG_AUTO);  
    int fd=common::new_tcp_server(34567);  
    ev_io w;  
    ev_io_init(&w,do_accept,fd,EV_READ);  
    ev_io_start(reactor,&w);  
    ev_run(reactor,0);  
    close(fd);  
    ev_loop_destroy(reactor);      

}

http://wangjunle23.blog.163.com/blog/static/11783817120124308920321/
`