转载

Boost::Asio库详解

io_service

所有的异步操作:异步网络读写,异步时钟,都在 io_service.run() 时进行轮询。有趣的是, io_service 在线程利用方面下了很大的功夫,你可以在主线程建立它的实例,但是在多个线程里面 runio_service 很擅长于将需要执行的回调函数分配到空闲线程当中。

io_service 为了跨平台,不得不在Linux下也实现Proactor模型。再说一遍,Asio的实现是Proactor模型。在Linux普遍是Reactor的情况下模拟出Proactor,作者也是很辛苦啊。 run 里面同时需要调度来自用户 post 的Handler和来自操作系统的IO请求(或者说是 Services post 的Handler)。

这里有一个Service的概念。Service是从一类IO操作中抽象出来的一些共同的地方。在你调用 socket.async_read 之类的操作之后,事实上它将真正跟操作系统打交道的操作托管给了 basic_stream_socket_service

io_service 的调度有多种实现,在 epoll 实现里,作者很机智地把 "有空闲进程" 事件转化为一个水平触发(Level Trigger),这样就可以在等待中醒来,进行调度了。

buffer

库中提供了 mutable_bufferconst_buffer 两种 单个缓冲 ,以及 mutable_buffers_1const_buffers_1 两种 缓冲序列 ,asio提供了一些操作:

  • buffer_cast<char*>(mb)单个缓冲 转成指针

  • buffer_size(buf) :取得缓冲区大小

  • buffer_copy(bufs, bufd) :缓冲区(包括单个和序列)之间复制 尽可能多的元素 ,它是安全的,不会出现溢出的情况

//`buffer` can wrap nearly everything vector<char> underlying_buffer(50); auto buf1 = buffer(underlying_buffer);  char underlying_string[] = "Hello"; auto buf2 = buffer(underlying_string);  buffer_copy(buf2, buf1); // returns 6  std::string s(buffer_begin(buf1), buffer_end(buf1));

streambuf

streambuf 是Asio能灵活地异步调控数据的关键。它能自动增长和回收consumed space。在使用的时候有这些要点:

  • streambuf 分为input sequence和output sequence两部分,这都是继承自 std::streambuf 的理念。

  • data() 来获取输入序列(常缓冲), prepare(n) 来获取输出序列(变缓冲)。

  • commit(n) 来将从输出序列提交到输入缓冲,用 consume(n) 来将输入缓冲中的数据丢弃,代表已经处理完毕。

  • read数据的过程:先 prepare 好固定大小的缓冲区,然后 buffer_copy 进去一些数据,拷贝了多少数据就 commit 多少数据。然后再从 prepare 开始,拷贝到手头上的数据没有了为止。

  • streambuf 不可拷贝,所以乖乖传引用或者指针吧。

这里有一些文档没有说明但是需要了解的细节:

  • 自动增长的功能是通过 reserve(n) 函数管理的,这个函数在 overflowprepare 处会调用,用于保证输出缓冲区里有至少n字节的空间。

  • 缓冲区的大小是没有限制的, max_size 一般是 size_t 能表示的最大数字,相当于内存的极限。

  • std::istream(sb).ignore(n)sb.gbump(n)sb.consume(n) 有相同的效果,但是 ignoreconsume 有防下溢机制。

async_read_xxxx

值得注意的是 async_read_until(socket, streambuf, condition, handler) 函数,给我们处理数据分段带来了极大的方便。它内建了4种条件决定read到什么时候停止 :

  • 出现了某个字符

  • 出现了某条字串

  • 符合了某条 Regular Expression

  • 符合了某条谓词的条件

注意:这里的"出现"指的是 streambuf 的input sequence中出现,也就是如果原本streambuf中的内容已经符合条件,则 async_read_until 将立即呼叫回调。

推论:某些库的 until 是不包含结束符的,比如 readLine 没有换行符。但是asio是包含的。

using boost::system::error_code; // this piece of code shows how to read Chunked Tranfer-Encoding streambuf sbuf; std::string content; void chunked_handler(const error_code& ec, std::size_t sz) {  std::istream is(sbuf);  std::size_t chunk_size;  is >> std::hex >> chunk_size;  std::assert(is.get() == '/r' && is.get() == '/n');  async_read(socket, sbuf,   [chunk_size] (const error_code& ec, std::size_t) {    return chunk_size + 2 - sbuf.size();},    [chunk_size] (const error_code& ec, std::size_t) {    std::istream is(sbuf);    boost::scoped_array<char> cbuf(new char[chunk_size]);    is.read(cbuf, chunk_size);    content += std::string(cbuf, chunk_size);    std::assert(is.get() == '/r' && is.get() == '/n');    async_read_until(socket, sbuf,     std::string("/r/n"), chunked_handler); }); } async_read_until(socket, sbuf,  std::string("/r/n"), chunked_handler);  

Networking

基本上都是 ip::tcp 里的东西了。 acceptor 相当于Java里的 ServerSocket ,没有任何数据传输的功能,只有作为服务器监听端口接收连接的功能。 socket 就是一般意义上的socket了,没有什么特别的。 iostream 是一个很聪明的设计,你可以用 operator >>operator << 来进行数据传输了,不过是同步阻塞的。这是使用上的一些要点:

  • socket::read_some 是非阻塞的, socket::async_read_some 会立即回调。read some的意义是,有多少读多少,没有就不读直接返回。 write_some 同理,如果网络不畅导致内核缓冲区满的话,返回0都是有可能的。

  • async_*** 的话很多时候要灵活运用 std::bind 了。

  • query 中的服务参数 service 可以是端口或者服务名,定义在 /etc/services 中。

  • socket::connect 仅对一个endpoint进行连接, connect 可对迭代器所指示的一系列endpoint进行连接,直到有其中一个成功连接为止。

using boost::system::error_code; // server: accept tcp::ip::socket sock(service); tcp::ip::acceptor acc(service); tcp::ip::endpoint ep(tcp::ip::v4(), 8080); acc.async_accpet(sock, eq, [sock] (const error_code&) {  // new connection handling }); //client: resolve + connect tcp::ip::resolver resolver(service); tcp::ip::resolver::query qurey("www.example.com", "http" /*"80"*/); resolver.async_resolve(query, [] (   const error_code& ec,   tcp::ip::resolver::iterator i) {  tcp::ip::socket sock(service);  tcp::ip::async_connect(i, [sock] (    const error_code& ec,    tcp::ip::resolver::iterator i) {   // new connection handling  }); }); 
正文到此结束
Loading...