转载

Timer 源码解读[连载]

Timer源码解读

涉及源码

  • lib/timers.js
  • src/timer_wrap.cc
  • deps/uv/src/{unix,win}/timer.c
  • deps/uv/src/heap-inl.h

主要分为 javascript 层面的实现和 libuv 层面的实现, 而timer_wrap.cc 作为一个bridge,完成 JS 和 C++的交互调用。

使用场景

定时器主要的使用场景或者说适用场景:

  • 定时任务,比如业务中定时检查状态等;
  • 超时控制,比如网络超时控制重传。

在 node.js 的实现中,

function responseOnEnd() {     // 省略     debug('AGENT socket keep-alive');     if (req.timeoutCb) {       socket.setTimeout(0, req.timeoutCb);       req.timeoutCb = null;     }  } ` 

你可能会有疑问:为啥在 HTTP 模块要用呢?

我们知道HTTP协议采用“请求-应答”模式,当使用普通模式,即非KeepAlive模式时,每个请求/应答客户和服务器都要新建一个连接,完成之后立即断开连接(HTTP协议为无连接的协议);当使用Keep-Alive模式(又称持久连接、连接重用)时,Keep-Alive功能使客户端到服务器端的连接持续有效,当出现对服务器的后继请求时,Keep-Alive功能避免了建立或者重新建立连接。

HTTP/1.0中默认是关闭的,需要在http头加入"Connection: Keep-Alive",才能启用Keep-Alive;http/1.1中默认启用Keep-Alive,如果加入"Connection: close",才关闭。目前大部分浏览器都是用HTTP/1.1协议,也就是说默认都会发起Keep-Alive的连接请求了,Node.js 默认也是 HTTP/1.1 协议。

当然了,这个连接不能就这么一直保持着,所以一般都会有一个超时时间,超过这个时间客户端还没有发送新的http请求,那么服务器就需要自动断开从而继续为其他客户端提供服务。

Node.js的HTTP 模块对于每一个新的连接创建一个 socket 对象,调用socket.setTimeout设置一个定时器用于超时后自动断开连接。

数据结构的选择

一个Timer本质上是这样的一个数据结构:deadline越近的任务拥有越高优先级,提供以下3种基本操作:

  • schedule 新增任务
  • cancel 删除任务
  • expire 执行到期的任务
实现方式 schedule cancel expire
基于链表 O(1) O(n) O(n)
基于排序链表 O(n) O(1) O(1)
基于最小堆 O(lgn) O(1) O(1)
基于时间轮 O(1) O(1) O(1)

timer 的实现历经变迁,每次变迁都是思维碰撞的火花,让我们走进源码,细细品味。

libuv 实现

数据结构-最小堆

最小堆首先是二叉堆,二叉堆是完全二元树或者是近似完全二元树,它分为两种:最大堆和最小堆。

最大堆:父结点的键值总是大于或等于任何一个子节点的键值;最小堆:父结点的键值总是小于或等于任何一个子节点的键值。示意图如下:

Timer 源码解读[连载]

节点定义在deps/uv/src/heap-inl.h,如下:

struct heap_node {   struct heap_node* left;   struct heap_node* right;   struct heap_node* parent; }; 

根节点定义:

/* A binary min heap.  The usual properties hold: the root is the lowest  * element in the set, the height of the tree is at most log2(nodes) and  * it's always a complete binary tree.  *  * The heap function try hard to detect corrupted tree nodes at the cost  * of a minor reduction in performance.  Compile with -DNDEBUG to disable.  */ struct heap {   struct heap_node* min;   unsigned int nelts; }; 

这边我们可以清楚的看到,最小堆采用指针组织数据,而不是数组。 min 始终指向最小的节点如果存在的话。作为一个排序的集合,它还需要一个用户指定的比较函数,决定哪个节点更小,或者说当过期时间一样时,决定他们的次序。毕竟没有规则不成方圆。

static int timer_less_than(const struct heap_node* ha,                            const struct heap_node* hb) {   const uv_timer_t* a;   const uv_timer_t* b;    a = container_of(ha, const uv_timer_t, heap_node);   b = container_of(hb, const uv_timer_t, heap_node);    if (a->timeout < b->timeout)     return 1;   if (b->timeout < a->timeout)     return 0;    /* Compare start_id when both have the same timeout. start_id is    * allocated with loop->timer_counter in uv_timer_start().    */   if (a->start_id < b->start_id)     return 1;   if (b->start_id < a->start_id)     return 0;    return 0; } 

这边我们可以看到,首先比较两者的 timeout ,如果二者一样,则比较二者被 schedule 的 id, 该 id 由 loop->timer_counter 递增生成,在调用

uv_timer_start 时赋值给 start_id .

具体实现

 62 int uv_timer_start(uv_timer_t* handle,  63                    uv_timer_cb cb,  64                    uint64_t timeout,  65                    uint64_t repeat) {  66   uint64_t clamped_timeout;  67   68   if (cb == NULL)  69     return -EINVAL;  70   71   if (uv__is_active(handle))  72     uv_timer_stop(handle);  73   74   clamped_timeout = handle->loop->time + timeout;  75   if (clamped_timeout < timeout)  76     clamped_timeout = (uint64_t) -1;  77   78   handle->timer_cb = cb;  79   handle->timeout = clamped_timeout;  80   handle->repeat = repeat;  81   /* start_id is the second index to be compared in uv__timer_cmp() */  82   handle->start_id = handle->loop->timer_counter++;  83   84   heap_insert((struct heap*) &handle->loop->timer_heap,  85               (struct heap_node*) &handle->heap_node,  86               timer_less_than);  87   uv__handle_start(handle);  88   89   return 0;  90 } 
  • L68-L69, 做参数的检查,错误则返回 -EINVAL。
  • L71-L72,如有是一个活跃的 timer, 则立即停止它。
  • L74-L82, 参数赋值,上面提到的 start_id 就是由 timer_counter 自增得到。
  • L84-L86, 插入 timer 节点到最小堆,此处算法复杂度为 O(lgn)。
  • L87, 标记句柄未活跃,并加入统计。
 93 int uv_timer_stop(uv_timer_t* handle) {  94   if (!uv__is_active(handle))  95     return 0;  96   97   heap_remove((struct heap*) &handle->loop->timer_heap,  98               (struct heap_node*) &handle->heap_node,  99               timer_less_than); 100   uv__handle_stop(handle); 101  102   return 0; 103 } 

L94,检查 handle, 如果是非获取的,则说明没有启动过,则返回成功。

L97-L99, 从最小堆中删除 timer的节点。

L100, 重置句柄,并减少计数。

了解了如何开启和关闭一个定时器,我们看如何调度定时器。

int uv_run(uv_loop_t* loop, uv_run_mode mode) {   ...   while (r != 0 && loop->stop_flag == 0) {     uv__update_time(loop);     uv__run_timers(loop);     ran_pending = uv__run_pending(loop);     ...  } 

在 node.js 的 event loop 中,更新时间后则立即调用 uv__run_timers ,可见 timer 作为一个外部系统依赖的模块,优先级是最高的。

150 void uv__run_timers(uv_loop_t* loop) { 151   struct heap_node* heap_node; 152   uv_timer_t* handle; 153  154   for (;;) { 155     heap_node = heap_min((struct heap*) &loop->timer_heap); 156     if (heap_node == NULL) 157       break; 158  159     handle = container_of(heap_node, uv_timer_t, heap_node); 160     if (handle->timeout > loop->time) 161       break; 162  163     uv_timer_stop(handle); 164     uv_timer_again(handle); 165     handle->timer_cb(handle); 166   } 167 } 

L155-L157, 取出最小的timer节点,如果为空,则跳出循环。

L159-L161, 通过 heap_node 的偏移拿到对象的首地址,如果最小的 timeout时间大于当前的时间,则说明过期时间还没到,则退出循环。

L163-L165, 删除 timer, 如果是需要重复执行的定时器,则通过调用 uv_timer_again 再次加入, L165执行 timer的 callback 任务后循环。

改进的分级时间轮实现

我最近实现的分级时间轮算法,思路来源于 Linux kernel, 性能有了大幅提升。

https://github.com/libuv/libuv/pull/823

连载,未完待续

下节提示: timer 的绑定; js层实现。

原文  http://alinode.aliyun.com/blog/32
正文到此结束
Loading...