這一章涵蓋了使用Boost.Asio時必須知道的一些事情。我們也將深入研究比同步編程更復雜、更有樂趣的異步編程。
這一部分包含了當使用Boost.Asio編寫網(wǎng)絡應用程序時必須知道的事情。
Boost.Asio的所有內(nèi)容都包含在boost::asio命名空間或者其子命名空間內(nèi)。
對于IP地址的處理,Boost.Asio提供了ip::address , ip::address_v4和ip::address_v6類。 它們提供了相當多的函數(shù)。下面列出了最重要的幾個:
大多數(shù)情況你會選擇用ip::address::from_string:
ip::address addr = ip::address::from_string("127.0.0.1");
如果你想通過一個主機名進行連接,下面的代碼片段是無用的:
// 拋出異常
ip::address addr = ip::address::from_string("www.yahoo.com");
端點是使用某個端口連接到的一個地址。不同類型的socket有它自己的endpoint類,比如ip::tcp::endpoint、ip::udp::endpoint和ip::icmp::endpoint
如果想連接到本機的80端口,你可以這樣做:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
有三種方式來讓你建立一個端點:
例子如下:
ip::tcp::endpoint ep1;
ip::tcp::endpoint ep2(ip::tcp::v4(), 80);
ip::tcp::endpoint ep3( ip::address::from_string("127.0.0.1), 80);
如果你想連接到一個主機(不是IP地址),你需要這樣做:
// 輸出 "87.248.122.122"
io_service service;
ip::tcp::resolver resolver(service);
ip::tcp::resolver::query query("www.yahoo.com", "80");
ip::tcp::resolver::iterator iter = resolver.resolve( query);
ip::tcp::endpoint ep = *iter;
std::cout << ep.address().to_string() << std::endl;
你可以用你需要的socket類型來替換tcp。首先,為你想要查詢的名字創(chuàng)建一個查詢器,然后用resolve()函數(shù)解析它。如果成功,它至少會返回一個入口。你可以利用返回的迭代器,使用第一個入口或者遍歷整個列表來拿到全部的入口。
給定一個端點,可以獲得他的地址,端口和IP協(xié)議(v4或者v6):
std::cout << ep.address().to_string() << ":" << ep.port()
<< "/" << ep.protocol() << std::endl;
Boost.Asio有三種類型的套接字類:ip::tcp, ip::udp和ip::icmp。當然它也是可擴展的,你可以創(chuàng)建自己的socket類,盡管這相當復雜。如果你選擇這樣做,參照一下boost/asio/ip/tcp.hpp, boost/asio/ip/udp.hpp和boost/asio/ip/icmp.hpp。它們都是含有內(nèi)部typedef關鍵字的超小類。
你可以把ip::tcp, ip::udp, ip::icmp類當作占位符;它們可以讓你便捷地訪問其他類/函數(shù),如下所示:
socket類創(chuàng)建一個相應的socket。而且總是在構造的時候傳入io_service實例:
io_service service;
ip::udp::socket sock(service)
sock.set_option(ip::udp::socket::reuse_address(true));
每一個socket的名字都是一個typedef關鍵字
所有的同步函數(shù)都有拋出異?;蛘叻祷劐e誤碼的重載,比如下面的代碼片段:
sync_func( arg1, arg2 ... argN); // 拋出異常
boost::system::error_code ec;
sync_func( arg1 arg2, ..., argN, ec); // 返回錯誤碼
在這一章剩下的部分,你會見到大量的同步函數(shù)。簡單起見,我省略了有返回錯誤碼的重載,但是不可否認它們確實是存在的。
這些方法被分成了幾組。并不是所有的方法都可以在各個類型的套接字里使用。這個部分的結尾將有一個列表來展示各個方法分別屬于哪個socket類。
注意所有的異步方法都立刻返回,而它們相對的同步實現(xiàn)需要操作完成之后才能返回。
這些方法是用來連接或綁定socket、斷開socket字連接以及查詢連接是活動還是非活動的:
例子如下:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.open(ip::tcp::v4()); n
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
char buff[1024]; sock.read_some(buffer(buff,1024));
sock.shutdown(ip::tcp::socket::shutdown_receive);
sock.close();
這些是在套接字上執(zhí)行I/O操作的函數(shù)。
對于異步函數(shù)來說,處理程序的格式void handler(const boost::system::error_code& e, size_t bytes)都是一樣的
稍后我們將討論緩沖區(qū)。讓我們先來了解一下標記。標記的默認值是0,但是也可以是以下幾種:
你最常用的可能是message_peek,使用方法請參照下面的代碼片段:
char buff[1024];
sock.receive(buffer(buff), ip::tcp::socket::message_peek );
memset(buff,1024, 0);
// 重新讀取之前已經(jīng)讀取過的內(nèi)容
sock.receive(buffer(buff) );
下面的是一些教你如何同步或異步地從不同類型的套接字上讀取數(shù)據(jù)的例子:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
std::cout << "bytes available " << sock.available() << std::endl;
char buff[512];
size_t read = sock.read_some(buffer(buff));
ip::udp::socket sock(service);
sock.open(ip::udp::v4());
ip::udp::endpoint receiver_ep("87.248.112.181", 80);
sock.send_to(buffer("testing\n"), receiver_ep);
char buff[512];
ip::udp::endpoint sender_ep;
sock.receive_from(buffer(buff), sender_ep);
[?注意:就像上述代碼片段所展示的那樣,使用receive_from從一個UDP套接字讀取數(shù)據(jù)時,你需要構造一個默認的端點]
using namespace boost::asio;
io_service service;
ip::udp::socket sock(service);
boost::asio::ip::udp::endpoint sender_ep;
char buff[512];
void on_read(const boost::system::error_code & err, std::size_t read_bytes) {
std::cout << "read " << read_bytes << std::endl;
sock.async_receive_from(buffer(buff), sender_ep, on_read);
}
int main(int argc, char* argv[]) {
ip::udp::endpoint ep(ip::address::from_string("127.0.0.1"),
8001);
sock.open(ep.protocol());
sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));
sock.bind(ep);
sock.async_receive_from(buffer(buff,512), sender_ep, on_read);
service.run();
}
這些函數(shù)用來處理套接字的高級選項:
這些是你可以獲取/設置的套接字選項:
名字 | 描述 | 類型 |
---|---|---|
broadcast | 如果為true,允許廣播消息 | bool |
debug | 如果為true,啟用套接字級別的調(diào)試 | bool |
do_not_route | 如果為true,則阻止路由選擇只使用本地接口 | bool |
enable_connection_aborted | 如果為true,記錄在accept()時中斷的連接 | bool |
keep_alive | 如果為true,會發(fā)送心跳 | bool |
linger | 如果為true,套接字會在有未發(fā)送數(shù)據(jù)的情況下掛起close() | bool |
receive_buffer_size | 套接字接收緩沖區(qū)大小 | int |
receive_low_watemark | 規(guī)定套接字輸入處理的最小字節(jié)數(shù) | int |
reuse_address | 如果為true,套接字能綁定到一個已用的地址 | bool |
send_buffer_size | 套接字發(fā)送緩沖區(qū)大小 | int |
send_low_watermark | 規(guī)定套接字數(shù)據(jù)發(fā)送的最小字節(jié)數(shù) | int |
ip::v6_only | 如果為true,則只允許IPv6的連接 | bool |
每個名字代表了一個內(nèi)部套接字typedef或者類。下面是對它們的使用:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
// TCP套接字可以重用地址
ip::tcp::socket::reuse_address ra(true);
sock.set_option(ra);
// 獲取套接字讀取的數(shù)據(jù)
ip::tcp::socket::receive_buffer_size rbs;
sock.get_option(rbs);
std::cout << rbs.value() << std::endl;
// 把套接字的緩沖區(qū)大小設置為8192
ip::tcp::socket::send_buffer_size sbs(8192);
sock.set_option(sbs);
[?在上述特性工作之前,套接字要被打開。否則,會拋出異常]
就像我之前所說,不是所有的成員方法在所有的套接字類中都可用。我做了一個包含成員函數(shù)不同點的列表。如果一個成員函數(shù)沒有出現(xiàn)在這,說明它在所有的套接字類都是可用的。
名字 | TCP | UDP | ICMP |
---|---|---|---|
async_read_some | 是 | - | - |
async_receive_from | - | 是 | 是 |
async_write_some | 是 | - | - |
async_send_to | - | 是 | 是 |
read_some | 是 | - | - |
receive_from | - | 是 | 是 |
write_some | 是 | - | - |
send_to | - | 是 | 是 |
其他與連接和I/O無關的函數(shù)如下:
最后要注意的一點,套接字實例不能被拷貝,因為拷貝構造方法和=操作符是不可訪問的。
ip::tcp::socket s1(service), s2(service);
s1 = s2; // 編譯時報錯
ip::tcp::socket s3(s1); // 編譯時報錯
這是非常有意義的,因為每一個實例都擁有并管理著一個資源(原生套接字本身)。如果我們允許拷貝構造,結果是我們會有兩個實例擁有同樣的原生套接字;這樣我們就需要去處理所有者的問題(讓一個實例擁有所有權?或者使用引用計數(shù)?還是其他的方法)Boost.Asio選擇不允許拷貝(如果你想要創(chuàng)建一個備份,請使用共享指針)
typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
socket_ptr sock1(new ip::tcp::socket(service));
socket_ptr sock2(sock1); // ok
socket_ptr sock3;
sock3 = sock1; // ok
當從一個套接字讀寫內(nèi)容時,你需要一個緩沖區(qū),用來保存讀取和寫入的數(shù)據(jù)。緩沖區(qū)內(nèi)存的有效時間必須比I/O操作的時間要長;你需要保證它們在I/O操作結束之前不被釋放。 對于同步操作來說,這很容易;當然,這個緩沖區(qū)在receive和send時都存在。
char buff[512];
...
sock.receive(buffer(buff));
strcpy(buff, "ok\n");
sock.send(buffer(buff));
但是在異步操作時就沒這么簡單了,看下面的代碼片段:
// 非常差勁的代碼 ...
void on_read(const boost::system::error_code & err, std::size_t read_bytes)
{ ... }
void func() {
char buff[512];
sock.async_receive(buffer(buff), on_read);
}
在我們調(diào)用async_receive()之后,buff就已經(jīng)超出有效范圍,它的內(nèi)存當然會被釋放。當我們開始從套接字接收一些數(shù)據(jù)時,我們會把它們拷貝到一片已經(jīng)不屬于我們的內(nèi)存中;它可能會被釋放,或者被其他代碼重新開辟來存入其他的數(shù)據(jù),結果就是:內(nèi)存沖突。
對于上面的問題有幾個解決方案:
第一個方法顯然不是很好,因為我們都知道全局變量非常不好。此外,如果兩個實例使用同一個緩沖區(qū)怎么辦?
下面是第二種方式的實現(xiàn):
void on_read(char * ptr, const boost::system::error_code & err, std::size_t read_bytes) {
delete[] ptr;
}
....
char * buff = new char[512];
sock.async_receive(buffer(buff, 512), boost::bind(on_read,buff,_1,_2))
或者,如果你想要緩沖區(qū)在操作結束后自動超出范圍,使用共享指針
struct shared_buffer {
boost::shared_array<char> buff;
int size;
shared_buffer(size_t size) : buff(new char[size]), size(size) {
}
mutable_buffers_1 asio_buff() const {
return buffer(buff.get(), size);
}
};
// 當on_read超出范圍時, boost::bind對象被釋放了,
// 同時也會釋放共享指針
void on_read(shared_buffer, const boost::system::error_code & err, std::size_t read_bytes) {}
sock.async_receive(buff.asio_buff(), boost::bind(on_read,buff,_1,_2));
shared_buffer類擁有實質的shared_array<>,shared_array<>存在的目的是用來保存shared_buffer實例的拷貝-當最后一個share_array<>元素超出范圍時,shared_array<>就被自動銷毀了,而這就是我們想要的結果。
因為Boost.Asio會給完成處理句柄保留一個拷貝,當操作完成時就會調(diào)用這個完成處理句柄,所以你的目的達到了。那個拷貝是一個boost::bind的仿函數(shù),它擁有著實際的shared_buffer實例。這是非常優(yōu)雅的!
第三個選擇是使用一個連接對象來管理套接字和其他數(shù)據(jù),比如緩沖區(qū),通常來說這是正確的解決方案但是非常復雜。在這一章的末尾我們會對這種方法進行討論。
縱觀所有代碼,你會發(fā)現(xiàn):無論什么時候,當我們需要對一個buffer進行讀寫操作時,代碼會把實際的緩沖區(qū)對象封裝在一個buffer()方法中,然后再把它傳遞給方法調(diào)用:
char buff[512];
sock.async_receive(buffer(buff), on_read);
基本上我們都會把緩沖區(qū)包含在一個類中以便Boost.Asio的方法能遍歷這個緩沖區(qū),比方說,使用下面的代碼:
sock.async_receive(some_buffer, on_read);
實例some_buffer需要滿足一些需求,叫做ConstBufferSequence或者MutableBufferSequence(你可以在Boost.Asio的文檔中查看它們)。創(chuàng)建你自己的類去處理這些需求的細節(jié)是非常復雜的,但是Boost.Asio已經(jīng)提供了一些類用來處理這些需求。所以你不用直接訪問這些緩沖區(qū),而可以使用buffer()方法。
自信地講,你可以把下面列出來的類型都包裝到一個buffer()方法中:
下面的代碼都是有效的:
struct pod_sample { int i; long l; char c; };
...
char b1[512];
void * b2 = new char[512];
std::string b3; b3.resize(128);
pod_sample b4[16];
std::vector<pod_sample> b5; b5.resize(16);
boost::array<pod_sample,16> b6;
std::array<pod_sample,16> b7;
sock.async_send(buffer(b1), on_read);
sock.async_send(buffer(b2,512), on_read);
sock.async_send(buffer(b3), on_read);
sock.async_send(buffer(b4), on_read);
sock.async_send(buffer(b5), on_read);
sock.async_send(buffer(b6), on_read);
sock.async_send(buffer(b7), on_read);
總的來說就是:與其創(chuàng)建你自己的類來處理ConstBufferSequence或者MutableBufferSequence的需求,不如創(chuàng)建一個能在你需要的時候保留緩沖區(qū),然后返回一個mutable_buffers_1實例的類,而我們早在shared_buffer類中就這樣做了。
Boost.Asio提供了處理I/O的自由函數(shù),我們分四組來分析它們。
這些方法把套接字連接到一個端點。
它的例子如下:
using namespace boost::asio::ip;
tcp::resolver resolver(service);
tcp::resolver::iterator iter = resolver.resolve(tcp::resolver::query("www.yahoo.com","80"));
tcp::socket sock(service);
connect(sock, iter);
一個主機名可以被解析成多個地址,而connect和async_connect能很好地把你從嘗試每個地址然后找到一個可用地址的繁重工作中解放出來,因為它們已經(jīng)幫你做了這些。
這些方法對一個流進行讀寫操作(可以是套接字,或者其他表現(xiàn)得像流的類):
async_read(stream, stream_buffer [, completion], handler)
async_write(strean, stream_buffer [, completion], handler)
write(stream, stream_buffer [, completion])
read(stream, stream_buffer [, completion])
首先,要注意第一個參數(shù)變成了流,而不單是socket。這個參數(shù)包含了socket但不僅僅是socket。比如,你可以用一個Windows的文件句柄來替代socket。 當下面情況出現(xiàn)時,所有read和write操作都會結束:
下面的代碼會異步地從一個socket中間讀取數(shù)據(jù)直到讀取到’\n’:
io_service service;
ip::tcp::socket sock(service);
char buff[512];
int offset = 0;
size_t up_to_enter(const boost::system::error_code &, size_t bytes) {
for ( size_t i = 0; i < bytes; ++i)
if ( buff[i + offset] == '\n')
return 0;
return 1;
}
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), up_to_enter, on_read);
Boost.Asio也提供了一些簡單的完成處理仿函數(shù):
例子如下:
char buff[512];
void on_read(const boost::system::error_code &, size_t) {}
// 讀取32個字節(jié)
async_read(sock, buffer(buff), transfer_exactly(32), on_read);
上述的4個方法,不使用普通的緩沖區(qū),而使用由Boost.Asio的std::streambuf類繼承來的stream_buffer方法。stl流和流緩沖區(qū)非常復雜;下面是例子:
io_service service;
void on_read(streambuf& buf, const boost::system::error_code &, size_t) {
std::istream in(&buf);
std::string line;
std::getline(in, line);
std::cout << "first line: " << line << std::endl;
}
int main(int argc, char* argv[]) {
HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
windows::stream_handle h(service, file);
streambuf buf;
async_read(h, buf, transfer_exactly(256), boost::bind(on_read,boost::ref(buf),_1,_2));
service.run();
}
在這里,我向你們展示了如何在一個Windows文件句柄上調(diào)用async_read。讀取前256個字符,然后把它們保存到緩沖區(qū)中,當操作結束時。on_read被調(diào)用,再創(chuàng)建std::istream用來傳遞緩沖區(qū),讀取第一行(std::getline),最后把它輸出到命令行中。
這些方法在條件滿足之前會一直讀?。?
下面這個例子在讀到一個指定的標點符號之前會一直讀取:
typedef buffers_iterator<streambuf::const_buffers_type> iterator;
std::pair<iterator, bool> match_punct(iterator begin, iterator end) {
while ( begin != end)
if ( std::ispunct(*begin))
return std::make_pair(begin,true);
return std::make_pair(end,false);
}
void on_read(const boost::system::error_code &, size_t) {}
...
streambuf buf;
async_read_until(sock, buf, match_punct, on_read);
如果我們想讀到一個空格時就結束,我們需要把最后一行修改為:
async_read_until(sock, buff, ' ', on_read);
這些方法用來在一個流上面做隨機存取操作。由你來指定read和write操作從什么地方開始(offset):
這些方法不支持套接字。它們用來處理流的隨機訪問;也就是說,流是可以隨機訪問的。套接字顯然不是這樣(套接字是不可回溯的)。
下面這個例子告訴你怎么從一個文件偏移為256的位置讀取128個字節(jié):
io_service service;
int main(int argc, char* argv[]) {
HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
windows::random_access_handle h(service, file);
streambuf buf;
read_at(h, 256, buf, transfer_exactly(128));
std::istream in(&buf);
std::string line;
std::getline(in, line);
std::cout << "first line: " << line << std::endl;
}
這部分對異步編程時可能碰到的一些問題進行了深入的探究。我建議你先讀一遍,然后在接下來讀這本書的過程中,再經(jīng)?;剡^頭來看看,從而增強你對這部分的理解。
就像我之前所說的,同步編程比異步編程簡單很多。這是因為,線性的思考是很簡單的(調(diào)用A,調(diào)用A結束,調(diào)用B,調(diào)用B結束,然后繼續(xù),這是以事件處理的方式來思考)。后面你會碰到這種情況,比如:五件事情,你不知道它們執(zhí)行的順序,也不知道他們是否會執(zhí)行!
盡管異步編程更難,但是你會更傾向于選擇使用它,比如:寫一個需要處理很多并發(fā)訪問的服務端。并發(fā)訪問越多,異步編程就比同步編程越簡單。
假設:你有一個需要處理1000個并發(fā)訪問的應用,從客戶端發(fā)給服務端的每個信息都會再返回給客戶端,以‘\n’結尾。
同步方式的代碼,1個線程:
using namespace boost::asio;
struct client {
ip::tcp::socket sock;
char buff[1024]; // 每個信息最多這么大
int already_read; // 你已經(jīng)讀了多少
};
std::vector<client> clients;
void handle_clients() {
while ( true)
for ( int i = 0; i < clients.size(); ++i)
if ( clients[i].sock.available() ) on_read(clients[i]);
}
void on_read(client & c) {
int to_read = std::min( 1024 - c.already_read, c.sock.available());
c.sock.read_some( buffer(c.buff + c.already_read, to_read));
c.already_read += to_read;
if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) {
int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff;
std::string msg(c.buff, c.buff + pos);
std::copy(c.buff + pos, c.buff + 1024, c.buff);
c.already_read -= pos;
on_read_msg(c, msg);
}
}
void on_read_msg(client & c, const std::string & msg) {
// 分析消息,然后返回
if ( msg == "request_login")
c.sock.write( "request_ok\n");
else if ...
}
有一種情況是在任何服務端(和任何基于網(wǎng)絡的應用)都需要避免的,就是代碼無響應的情況。在我們的例子里,我們需要handle_clients()方法盡可能少的阻塞。如果方法在某個點上阻塞,任何進來的信息都需要等待方法解除阻塞才能被處理。
為了保持響應,只在一個套接字有數(shù)據(jù)的時候我們才讀,也就是說,if ( clients[i].sock.available() ) on_read(clients[i])。在on_read時,我們只讀當前可用的;調(diào)用read_until(c.sock, buffer(...), '\n')會是一個非常糟糕的選擇,因為直到我們從一個指定的客戶端讀取了完整的消息之前,它都是阻塞的(我們永遠不知道它什么時候會讀取到完整的消息)
這里的瓶頸就是on_read_msg()方法;當它執(zhí)行時,所有進來的消息都在等待。一個良好的on_read_msg()方法實現(xiàn)會保證這種情況基本不會發(fā)生,但是它還是會發(fā)生(有時候向一個套接字寫入數(shù)據(jù),緩沖區(qū)滿了時,它會被阻塞) 同步方式的代碼,10個線程
using namespace boost::asio;
struct client {
// ... 和之前一樣
bool set_reading() {
boost::mutex::scoped_lock lk(cs_);
if ( is_reading_) return false; // 已經(jīng)在讀取
else { is_reading_ = true; return true; }
}
void unset_reading() {
boost::mutex::scoped_lock lk(cs_);
is_reading_ = false;
}
private:
boost::mutex cs_;
bool is_reading_;
};
std::vector<client> clients;
void handle_clients() {
for ( int i = 0; i < 10; ++i)
boost::thread( handle_clients_thread);
}
void handle_clients_thread() {
while ( true)
for ( int i = 0; i < clients.size(); ++i)
if ( clients[i].sock.available() )
if ( clients[i].set_reading()) {
on_read(clients[i]);
clients[i].unset_reading();
}
}
void on_read(client & c) {
// 和之前一樣
}
void on_read_msg(client & c, const std::string & msg) {
// 和之前一樣
}
為了使用多線程,我們需要對線程進行同步,這就是set_reading()和set_unreading()所做的。set_reading()方法非常重要,比如你想要一步實現(xiàn)“判斷是否在讀取然后標記為讀取中”。但這是有兩步的(“判斷是否在讀取”和“標記為讀取中”),你可能會有兩個線程同時為一個客戶端判斷是否在讀取,然后你會有兩個線程同時為一個客戶端調(diào)用on_read,結果就是數(shù)據(jù)沖突甚至導致應用崩潰。
你會發(fā)現(xiàn)代碼變得極其復雜。
同步編程有第三個選擇,就是為每個連接開辟一個線程。但是當并發(fā)的線程增加時,這就成了一種災難性的情況。
然后,讓我們來看異步編程。我們不斷地異步讀取。當一個客戶端請求某些東西時,on_read被調(diào)用,然后回應,然后等待下一個請求(然后開始另外一個異步的read操作)。
異步方式的代碼,10個線程
using namespace boost::asio;
io_service service;
struct client {
ip::tcp::socket sock;
streambuf buff; // 從客戶端取回結果
}
std::vector<client> clients;
void handle_clients() {
for ( int i = 0; i < clients.size(); ++i)
async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2));
for ( int i = 0; i < 10; ++i)
boost::thread(handle_clients_thread);
}
void handle_clients_thread() {
service.run();
}
void on_read(client & c, const error_code & err, size_t read_bytes) {
std::istream in(&c.buff);
std::string msg;
std::getline(in, msg);
if ( msg == "request_login")
c.sock.async_write( "request_ok\n", on_write);
else if ...
...
// 等待同一個客戶端下一個讀取操作
async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2));
}
發(fā)現(xiàn)代碼變得有多簡單了吧?client結構里面只有兩個成員,handle_clients()僅僅調(diào)用了async_read_until,然后它創(chuàng)建了10個線程,每個線程都調(diào)用service.run()。這些線程會處理所有來自客戶端的異步read操作,然后分發(fā)所有向客戶端的異步write操作。另外需要注意的一件事情是:on_read()一直在為下一次異步read操作做準備(看最后一行代碼)。
為了實現(xiàn)監(jiān)聽循環(huán),io_service類提供了4個方法,比如:run(), run_one(), poll()和poll_one()。雖然大多數(shù)時候使用service.run()就可以,但是你還是需要在這里學習其他方法實現(xiàn)的功能。
再一次說明,如果有等待執(zhí)行的操作,run()會一直執(zhí)行,直到你手動調(diào)用io_service::stop()。為了保證io_service一直執(zhí)行,通常你添加一個或者多個異步操作,然后在它們被執(zhí)行時,你繼續(xù)一直不停地添加異步操作,比如下面代碼:
using namespace boost::asio;
io_service service;
ip::tcp::socket sock(service);
char buff_read[1024], buff_write[1024] = "ok";
void on_read(const boost::system::error_code &err, std::size_t bytes);
void on_write(const boost::system::error_code &err, std::size_t bytes)
{
sock.async_read_some(buffer(buff_read), on_read);
}
void on_read(const boost::system::error_code &err, std::size_t bytes)
{
// ... 處理讀取操作 ...
sock.async_write_some(buffer(buff_write,3), on_write);
}
void on_connect(const boost::system::error_code &err) {
sock.async_read_some(buffer(buff_read), on_read);
}
int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001);
sock.async_connect(ep, on_connect);
service.run();
}
我在之前說過異步方法的handler是在調(diào)用了io_service::run的線程里被調(diào)用的。因為在至少90%~95%的時候,這是你唯一要用到的方法,所以我就把它說得簡單了。對于調(diào)用了run_one(), poll()或者poll_one()的線程這一點也是適用的。
run_one()方法最多執(zhí)行和分發(fā)一個異步操作:
你可以認為下面兩段代碼是等效的:
io_service service;
service.run(); // 或者
while ( !service.stopped()) service.run_once();
你可以使用run_once()啟動一個異步操作,然后等待它執(zhí)行完成。
io_service service;
bool write_complete = false;
void on_write(const boost::system::error_code & err, size_t bytes)
{ write_complete = true; }
…
std::string data = "login ok”;
write_complete = false;
async_write(sock, buffer(data), on_write);
do service.run_once() while (!write_complete);
還有一些使用run_one()方法的例子,包含在Boost.Asio諸如blocking_tcp_client.cpp和blocking_udp_client.cpp的文件中。
poll_one方法以非阻塞的方式最多運行一個準備好的等待操作:
操作正在等待并準備以非阻塞方式運行,通常意味著如下的情況:
你可以使用poll_one去保證所有I/O操作的handler完成運行,同時做一些其他的工作
io_service service;
while ( true) {
// 運行所有完成了IO操作的handler
while ( service.poll_one()) ;
// ... 在這里做其他的事情 …
}
poll()方法會以非阻塞的方式運行所有等待的操作。下面兩段代碼是等效的:
io_service service;
service.poll(); // 或者
while ( service.poll_one()) ;
所有上述方法都會在失敗的時候拋出boost::system::system_error異常。這是我們所不希望發(fā)生的事情;這里拋出的異常通常都是致命的,也許是資源耗盡,或者是你handler的其中一個拋出了異常。另外,每個方法都有一個不拋出異常,而是返回一個boost::system::error_code的重載:
io_service service;
boost::system::error_code err = 0;
service.run(err);
if ( err) std::cout << "Error " << err << std::endl;
異步工作不僅僅指用異步地方式接受客戶端到服務端的連接、異步地從一個socket讀取或者寫入到socket。它包含了所有可以異步執(zhí)行的操作。
默認情況下,你是不知道每個異步handler的調(diào)用順序的。除了通常的異步調(diào)用(來自異步socket的讀取/寫入/接收)。你可以使用service.post()來使你的自定義方法被異步地調(diào)用。例如:
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << std::endl;
}
void worker_thread() {
service.run();
}
int main(int argc, char* argv[]) {
for ( int i = 0; i < 10; ++i)
service.post(boost::bind(func, i));
boost::thread_group threads;
for ( int i = 0; i < 3; ++i)
threads.create_thread(worker_thread);
// 等待所有線程被創(chuàng)建完
boost::this_thread::sleep( boost::posix_time::millisec(500));
threads.join_all();
}
在上面的例子中,service.post(some_function)添加了一個異步方法調(diào)用。
這個方法在某一個調(diào)用了service.run()的線程中請求io_service實例,然后調(diào)用給定的some_funtion之后立即返回。在我們的例子中,這個線程是我們之前創(chuàng)建的三個線程中的一個。你不能確定異步方法調(diào)用的順序。你不要期待它們會以我們調(diào)用post()方法的順序來調(diào)用。下面是運行之前代碼可能得到的結果:
func called, i= 0
func called, i= 2
func called, i= 1
func called, i= 4
func called, i= 3
func called, i= 6
func called, i= 7
func called, i= 8
func called, i= 5
func called, i= 9
有時候你會想讓一些異步處理方法順序執(zhí)行。比如,你去一個餐館(go_to_restaurant),下單(order),然后吃(eat)。你需要先去餐館,然后下單,最后吃。這樣的話,你需要用到io_service::strand,這個方法會讓你的異步方法被順序調(diào)用??聪旅娴睦樱?/p>
using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl;
}
void worker_thread() {
service.run();
}
int main(int argc, char* argv[])
{
io_service::strand strand_one(service), strand_two(service);
for ( int i = 0; i < 5; ++i)
service.post( strand_one.wrap( boost::bind(func, i)));
for ( int i = 5; i < 10; ++i)
service.post( strand_two.wrap( boost::bind(func, i)));
boost::thread_group threads;
for ( int i = 0; i < 3; ++i)
threads.create_thread(worker_thread);
// 等待所有線程被創(chuàng)建完
boost::this_thread::sleep( boost::posix_time::millisec(500));
threads.join_all();
}
在上述代碼中,我們保證前面的5個線程和后面的5個線程是順序執(zhí)行的。func called, i = 0在func called, i = 1之前被調(diào)用,然后調(diào)用func called, i = 2……同樣func called, i = 5在func called, i = 6之前,func called, i = 6在func called, i = 7被調(diào)用……你需要注意的是盡管方法是順序調(diào)用的,但是不意味著它們都在同一個線程執(zhí)行。運行這個程序可能得到的一個結果如下:
func called, i= 0/002A60C8
func called, i= 5/002A6138
func called, i= 6/002A6530
func called, i= 1/002A6138
func called, i= 7/002A6530
func called, i= 2/002A6138
func called, i= 8/002A6530
func called, i= 3/002A6138
func called, i= 9/002A6530
func called, i= 4/002A6138
Boost.Asio提供了三種讓你把處理方法添加為異步調(diào)用的方式:
在之前的章節(jié)中你會看到關于service.post()的一個例子,以及運行這個例子可能得到的一種結果。我們對它做一些修改,然后看看service.dispatch()是怎么影響輸出的結果的:
using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << std::endl;
}
void run_dispatch_and_post() {
for ( int i = 0; i < 10; i += 2) {
service.dispatch(boost::bind(func, i));
service.post(boost::bind(func, i + 1));
}
}
int main(int argc, char* argv[]) {
service.post(run_dispatch_and_post);
service.run();
}
在解釋發(fā)生了什么之前,我們先運行程序,觀察結果:
func called, i= 0
func called, i= 2
func called, i= 4
func called, i= 6
func called, i= 8
func called, i= 1
func called, i= 3
func called, i= 5
func called, i= 7
func called, i= 9
偶數(shù)先輸出,然后是奇數(shù)。這是因為我用dispatch()輸出偶數(shù),然后用post()輸出奇數(shù)。dispatch()會在返回之前調(diào)用hanlder,因為當前的線程調(diào)用了service.run(),而post()每次都立即返回了。
現(xiàn)在,讓我們講講service.wrap(handler)。wrap()返回了一個仿函數(shù),它可以用來做另外一個方法的參數(shù):
using namespace boost::asio;
io_service service;
void dispatched_func_1() {
std::cout << "dispatched 1" << std::endl;
}
void dispatched_func_2() {
std::cout << "dispatched 2" << std::endl;
}
void test(boost::function<void()> func) {
std::cout << "test" << std::endl;
service.dispatch(dispatched_func_1);
func();
}
void service_run() {
service.run();
}
int main(int argc, char* argv[]) {
test( service.wrap(dispatched_func_2));
boost::thread th(service_run);
boost::this_thread::sleep( boost::posix_time::millisec(500));
th.join();
}
test(service.wrap(dispatched_func_2));會把dispatched_ func_2包裝起來創(chuàng)建一個仿函數(shù),然后傳遞給test當作一個參數(shù)。當test()被調(diào)用時,它會分發(fā)調(diào)用方法1,然后調(diào)用func()。這時,你會發(fā)現(xiàn)調(diào)用func()和service.dispatch(dispatched_func_2)是等價的,因為它們是連續(xù)調(diào)用的。程序的輸出證明了這一點:
test
dispatched 1
dispatched 2
io_service::strand 類(用來序列化異步調(diào)用)也包含了poll(), dispatch()和 wrap()等成員函數(shù)。它們的作用和io_service的poll(), dispatch()和wrap()是一樣的。然而,大多數(shù)情況下你只需要把io_service::strand::wrap()方法做為io_service::poll()或者io_service::dispatch()方法的參數(shù)即可。
假設你需要做下面的操作:
io_service service;
ip::tcp::socket sock(service);
char buff[512];
...
read(sock, buffer(buff));
在這個例子中,sock和buff的存在時間都必須比read()調(diào)用的時間要長。也就是說,在調(diào)用read()返回之前,它們都必須有效。這就是你所期望的;你傳給一個方法的所有參數(shù)在方法內(nèi)部都必須有效。當我們采用異步方式時,事情會變得比較復雜。
io_service service;
ip::tcp::socket sock(service);
char buff[512];
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), on_read);
在這個例子中,sock和buff的存在時間都必須比read()操作本身時間要長,但是read操作持續(xù)的時間我們是不知道的,因為它是異步的。
當使用socket緩沖區(qū)的時候,你會有一個buffer實例在異步調(diào)用時一直存在(使用boost::shared_array<>)。在這里,我們可以使用同樣的方式,通過創(chuàng)建一個類并在其內(nèi)部管理socket和它的讀寫緩沖區(qū)。然后,對于所有的異步操作,傳遞一個包含智能指針的boost::bind仿函數(shù)給它:
using namespace boost::asio;
io_service service;
struct connection : boost::enable_shared_from_this<connection> {
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<connection> ptr;
connection() : sock_(service), started_(true) {}
void start(ip::tcp::endpoint ep) {
sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1));
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
bool started() { return started_; }
private:
void on_connect(const error_code & err) {
// 這里你決定用這個連接做什么: 讀取或者寫入
if ( !err) do_read();
else stop();
}
void on_read(const error_code & err, size_t bytes) {
if ( !started() ) return;
std::string msg(read_buffer_, bytes);
if ( msg == "can_login") do_write("access_data");
else if ( msg.find("data ") == 0) process_data(msg);
else if ( msg == "login_fail") stop();
}
void on_write(const error_code & err, size_t bytes) {
do_read();
}
void do_read() {
sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2));
}
void do_write(const std::string & msg) {
if ( !started() ) return;
// 注意: 因為在做另外一個async_read操作之前你想要發(fā)送多個消息,
// 所以你需要多個寫入buffer
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2));
}
void process_data(const std::string & msg) {
// 處理服務端來的內(nèi)容,然后啟動另外一個寫入操作
}
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
};
int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
connection::ptr(new connection)->start(ep);
}
在所有異步調(diào)用中,我們傳遞一個boost::bind仿函數(shù)當作參數(shù)。這個仿函數(shù)內(nèi)部包含了一個智能指針,指向connection實例。只要有一個異步操作等待時,Boost.Asio就會保存boost::bind仿函數(shù)的拷貝,這個拷貝保存了指向連接實例的一個智能指針,從而保證connection實例保持活動。問題解決!
當然,connection類僅僅是一個框架類;你需要根據(jù)你的需求對它進行調(diào)整(它看起來會和當前服務端例子的情況相當不同)。
你需要注意的是創(chuàng)建一個新的連接是相當簡單的:connection::ptr(new connection)- >start(ep)。這個方法啟動了到服務端的(異步)連接。當你需要關閉這個連接時,調(diào)用stop()。
當實例被啟動時(start()),它會等待客戶端的連接。當連接發(fā)生時。on_connect()被調(diào)用。如果沒有錯誤發(fā)生,它啟動一個read操作(do_read())。當read操作結束時,你就可以解析這個消息;當然你應用的on_read()看起來會各種各樣。而當你寫回一個消息時,你需要把它拷貝到緩沖區(qū),然后像我在do_write()方法中所做的一樣將其發(fā)送出去,因為這個緩沖區(qū)同樣需要在這個異步寫操作中一直存活。最后需要注意的一點——當寫回時,你需要指定寫入的數(shù)量,否則,整個緩沖區(qū)都會被發(fā)送出去。
網(wǎng)絡api實際上要繁雜得多,這個章節(jié)只是做為一個參考,當你在實現(xiàn)自己的網(wǎng)絡應用時可以回過頭來看看。
Boost.Asio實現(xiàn)了端點的概念,你可以認為是IP和端口。如果你不知道準確的IP,你可以使用resolver對象將主機名,例如www.yahoo.com轉換為一個或多個IP地址。
我們也可以看到API的核心——socket類。Boost.Asio提供了TCP、UDP和 ICMP的實現(xiàn)。而且你還可以用你自己的協(xié)議來對它進行擴展;當然,這個工作不適合缺乏勇氣的人。
異步編程是剛需。你應該已經(jīng)明白為什么有時候需要用到它,尤其在寫服務端的時候。調(diào)用service.run()來實現(xiàn)異步循環(huán)就已經(jīng)可以讓你很滿足,但是有時候你需要更進一步,嘗試使用run_one()、poll()或者poll_one()。
當實現(xiàn)異步時,你可以異步執(zhí)行你自己的方法;使用service.post()或者service.dispatch()。
最后,為了使socket和緩沖區(qū)(read或者write)在整個異步操作的生命周期中一直活動,我們需要采取特殊的防護措施。你的連接類需要繼承自enabled_shared_from_this,然后在內(nèi)部保存它需要的緩沖區(qū),而且每次異步調(diào)用都要傳遞一個智能指針給this操作。
下一章會進行實戰(zhàn)操作;在實現(xiàn)回顯客戶端/服務端應用時會有大量的編程實踐。
更多建議: