转载

Tornado Demo 之 chatdemo 不完全解读

tornado 源码自带了丰富的 demo ,这篇文章主要分析 demo 中的聊天室应用: chatdemo

首先看 chatdemo 的目录结构:

├── chatdemo.py ├── static │   ├── chat.css │   └── chat.js └── templates     ├── index.html     ├── message.html     └── room.html

非常简单,基本没有分层,三个模版一个 js 一个 css ,还有一个最重要的 chatdemo.py

本文的重点是弄清楚 chatdemo.py 的运行流程,所以对于此项目的其他文件,包括模版及 chat.js 的实现都不会分析,只要知道 chat.js 的工作流程相信对于理解 chatdemo.py 没有任何问题

此 demo 主要基于长轮询。 获取新消息的原理:

  1. 在 chat.js 中有一个定时器会定时执行 update 操作

  2. 当没有新消息时 tornado 会一直 hold 住 chat.js 发来的 update 请求

  3. 当有新消息时 tornado 将包含新消息的数据返回给所有 hold 的 update 请求

  4. 此时 chat.js 收到 update 回复后更新返回数据在聊天室中,同时再进行一次 update 请求, 然后又从 1. 开始执行。

发送新消息的原理:

  1. 输入消息, 点击 post 按钮, chat.js 获取表单后用 ajax 方式发送请求 new

  2. tornado 收到请求 new ,返回消息本身, 同时通知所有 hold 住的 update 请求 ( 这里也包括发送 new 请求的 chat.js 所发送的 update 请求 ) 返回新消息

  3. 所有在线的 chat.js 收到 update 请求回复,更新返回信息到聊天室,同时再进行一次 update 请求。

清楚了以上流程,我们直接来看 chatdemo.py :

def main():     parse_command_line()     app = tornado.web.Application(         [             (r"/", MainHandler),             (r"/a/message/new", MessageNewHandler),             (r"/a/message/updates", MessageUpdatesHandler),             ],         cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",         template_path=os.path.join(os.path.dirname(__file__), "templates"),         static_path=os.path.join(os.path.dirname(__file__), "static"),         xsrf_cookies=True,         debug=options.debug,         )     app.listen(options.port)     tornado.ioloop.IOLoop.current().start()   if __name__ == "__main__":     main()

main 函数主要用作初始化应用、监听端口以及启动 tornado server 。我们看路由:

  1. 主页对应 MainHandler

  2. new 请求对应 MessageNewHandler

  3. updates 请求对应 MessageUpdatesHandler

下面来看 MainHandler :

# Making this a non-singleton is left as an exercise for the reader. global_message_buffer = MessageBuffer()  class MainHandler(tornado.web.RequestHandler):     def get(self):         self.render("index.html", messages=global_message_buffer.cache)

只有一行代码,就是渲染并返回 index.html,渲染的附加信息就是 global_message_buffer 的所有缓存消息。 global_message_buffer 是 MessageBuffer 的一个实例。 我们先不关心 MessageBuffer 内部是什么,现在我们只要记住它主要是 用来储存聊天消息和连接到此聊天室的人的信息的类 。 其中 MessageBuffer().cache 就是保存聊天室所有聊天消息的结构。

然后来看 MessageNewHandler :

class MessageNewHandler(tornado.web.RequestHandler):     def post(self):         message = {             "id": str(uuid.uuid4()),             "body": self.get_argument("body"),         }         # to_basestring is necessary for Python 3's json encoder,         # which doesn't accept byte strings.         message["html"] = tornado.escape.to_basestring(             self.render_string("message.html", message=message))         if self.get_argument("next", None):             self.redirect(self.get_argument("next"))         else:             self.write(message)         global_message_buffer.new_messages([message])

同样很简单,从 post 信息里获取发来的新消息 ( body ) ,然后给消息分配一个唯一的 uuid,接着把这段消息渲染成一段 html ,然后 self.write(message) 返回这段 html, 同时给 global_message_buffer ( MessageBuffer 的实例 ) 添加这条新信息。 这里其实我更倾向于返回 json 之类的数据,这样更加直观和规范,可能写 demo 的人考虑到读者对 json 之类的协议可能不熟悉故而选择了返回渲染好的 html 直接让 chat.js append 到 index.html 里。

接着来看 MessageUpdatesHandler :

class MessageUpdatesHandler(tornado.web.RequestHandler):     @gen.coroutine     def post(self):         cursor = self.get_argument("cursor", None)         # Save the future returned by wait_for_messages so we can cancel         # it in wait_for_messages         self.future = global_message_buffer.wait_for_messages(cursor=cursor)         messages = yield self.future         if self.request.connection.stream.closed():             return         self.write(dict(messages=messages))      def on_connection_close(self):         global_message_buffer.cancel_wait(self.future)

重点就在这里,可以看到其内部的 post 方法被 gen.coroutine 修饰器修饰,也就是说这个 post 方法现在是 协程 ( coroutine ) 方式工作。 对于协程比较陌生的童鞋,你可以直接把它当作是 单线程解决 io ( 网络请求 ) 密集运算被阻塞而导致低效率的解决方案 。 当然这样理解协程还比较笼统,之后我会详细写一篇关于协程的文章,但在这里这样理解是没有问题的。

现在来看代码内容,首先获取 cursor ,一个用来标识我们已经获取的消息的指针,这样 tornado 就不会把你已经获取的消息重复的发给你。 然后调用 global_message_buffer.wait_for_messages(cursor=cursor) 获取一个 future 对象。 future 对象是 tornado 实现的一个特殊的类的实例,它的作用就是 包含之后 ( 未来 ) 将会返回的数据 ,我们现在不用关心 Future() 内部如何实现,只要记住上面它的作用就行。 关于 Future 的解读我会放到阅读 Future 源码时讲。

然后看最关键的这句: messages = yield self.future 注意这里的 yield 就是 hold updates 请求的关键,它到这里相当于 暂停了整个 post 函数 ( updates 请求被 hold )同时也相当于 updates 这次网络请求被阻塞,这个时候协程发挥作用,把这个函数暂停的地方的所有信息保存挂起,然后把工作线程释放,这样 tornado 可以继续接受 new、 updates 等请求然后运行相应的方法处理请求。

当有新的消息返回时, tornado 底层的 ioloop 实例将会调用 gen.send(value) 返回新消息( value )给每个被暂停的方法的 yield 处, 此时协程依次恢复这些被暂停的方法, 同时用获得的返回消息继续执行方法, 这时 messages = yield self.future 继续执行,messages 获得 yield 的返回值 value ( python 中调用 gen.send(value) 将会把 value 值返回到 yield 处并替换原前 yield 后的值 ),然后判断下用户是否已经离开,如果还在线则返回新消息。

明白了以上流程,我们最后来看 MessageBuffer :

class MessageBuffer(object):     def __init__(self):         self.waiters = set()         self.cache = []         self.cache_size = 200      def wait_for_messages(self, cursor=None):         # Construct a Future to return to our caller.  This allows         # wait_for_messages to be yielded from a coroutine even though         # it is not a coroutine itself.  We will set the result of the         # Future when results are available.         result_future = Future()         if cursor:             new_count = 0             for msg in reversed(self.cache):                 if msg["id"] == cursor:                     break                 new_count += 1             if new_count:                 result_future.set_result(self.cache[-new_count:])                 return result_future         self.waiters.add(result_future)         return result_future      def cancel_wait(self, future):         self.waiters.remove(future)         # Set an empty result to unblock any coroutines waiting.         future.set_result([])      def new_messages(self, messages):         logging.info("Sending new message to %r listeners", len(self.waiters))         for future in self.waiters:             future.set_result(messages)         self.waiters = set()         self.cache.extend(messages)         if len(self.cache) > self.cache_size:             self.cache = self.cache[-self.cache_size:]

初始化方法中 self.waiters 就是一个等待新消息的 listener 集合 ( 直接理解成所有被 hold 住的 updates 请求队列可能更清晰 )

self.cache 就是储存所有聊天消息的列表, self.cache_size = 200 则定义了 cache 的大小 是存 200 条消息。

然后先来看简单的 new_messages:

遍历 waiters 列表,然后给所有的等待者返回新消息,同时清空等待者队列。 然后把消息加到缓存 cache 里,如果缓存大于限制则取最新的 200 条消息。这里只要注意到 future.set_result(messages) 就是用来给 future 对象添加返回数据 ( 之前被 yield 暂停的地方此时因为 set_result() 方法将会获得 "未来" 的数据 ) 这一点即可。

然后来看 wait_for_messages :

    def wait_for_messages(self, cursor=None):         # Construct a Future to return to our caller.  This allows         # wait_for_messages to be yielded from a coroutine even though         # it is not a coroutine itself.  We will set the result of the         # Future when results are available.         result_future = Future()         if cursor:             new_count = 0             for msg in reversed(self.cache):                 if msg["id"] == cursor:                     break                 new_count += 1             if new_count:                 result_future.set_result(self.cache[-new_count:])                 return result_future         self.waiters.add(result_future)         return result_future

首先初始化一个 Future 对象,然后根据 cursor 判断哪些消息已经获取了哪些还没获取,如果缓存中有对于这个 waiter 还没获取过的消息,则直接调用 set_result() 返回这些缓存中已有的但对于这个 waiter 来说是新的的数据。 如果这个 waiter 已经有缓存中的所有数据,那么就把它加到等待者队列里保持等待,直到有新消息来时调用 new_messages 再返回。

而最后一个 cancel_wait 就很简单了,当有用户退出聊天室时,直接从 self.waiters 中移除他所对应的等待者。

当明白了整个代码的运行流程后,我们可以基于这个简单的 demo 而写出更加丰富的例子,比如加入 session ,做登陆、做好友关系,做单聊做群聊等等。

chatdemo with room 是我添加的一个简单功能,输入聊天室房号后再进行聊天,只有同一房间中的人才能收到彼此的消息。

以上就是鄙人对整个 chatdemo.py 的解读。 在阅读此 demo 时,我没有参考其他源码解读,只是通过阅读 tornado 底层的源码而得出的个人的理解,因此肯定会有很多理解不成熟甚至错误的地方,还望大家多多指教。

原文地址

作者: rapospectre

原文  https://segmentfault.com/a/1190000005780743
正文到此结束
Loading...