在這一章,我們將會實現(xiàn)一個小的客戶端/服務端應用,這可能會是你寫過的最簡單的客戶端/服務端應用?;仫@應用就是一個把客戶端發(fā)過來的任何內容回顯給其本身,然后關閉連接的的服務端。這個服務端可以處理任何數(shù)量的客戶端。每個客戶端連接之后發(fā)送一個消息,服務端接收到完成消息后把它發(fā)送回去。在那之后,服務端關閉連接。
因此,每個回顯客戶端連接到服務端,發(fā)送一個消息,然后讀取服務端返回的結果,確保這是它發(fā)送給服務端的消息就結束和服務端的會話。
我們首先實現(xiàn)一個同步應用,然后實現(xiàn)一個異步應用,以便你可以很容易對比他們:
為了節(jié)省空間,下面的代碼有一些被裁剪掉了。你可以在附加在這本書的代碼中看到全部的代碼。
對于TCP而言,我們需要一個額外的保證;每一個消息以換行符結束(‘\n’)。編寫一個同步回顯服務端/客戶端非常簡單。
我們會展示編碼內容,比如同步客戶端,同步服務端,異步客戶端和異步服務端。
在大多數(shù)有價值的例子中,客戶端通常比服務端編碼要簡單(因為服務端需要處理多個客戶端請求)。
下面的代碼展示了不符合這條規(guī)則的一個例外:
size_t read_complete(char * buf, const error_code & err, size_t bytes)
{
if ( err) return 0;
bool found = std::find(buf, buf + bytes, '\n') < buf + bytes;
// 我們一個一個讀取直到讀到回車,不緩存
return found ? 0 : 1;
}
void sync_echo(std::string msg) {
msg += "\n”;
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer(msg));
char buf[1024];
int bytes = read(sock, buffer(buf), boost::bind(read_complete,buf,_1,_2));
std::string copy(buf, bytes - 1);
msg = msg.substr(0, msg.size() - 1);
std::cout << "server echoed our " << msg << ": "<< (copy == msg ? "OK" : "FAIL") << std::endl;
sock.close();
}
int main(int argc, char* argv[]) {
char* messages[] = { "John says hi", "so does James", "Lucy just got home", "Boost.Asio is Fun!", 0 };
boost::thread_group threads;
for ( char ** message = messages; *message; ++message) {
threads.create_thread( boost::bind(sync_echo, *message));
boost::this_thread::sleep( boost::posix_time::millisec(100));
}
threads.join_all();
}
核心功能sync_echo。它包含了連接到服務端,發(fā)送信息然后等待回顯的所有邏輯。
你會發(fā)現(xiàn),在讀取時,我使用了自由函數(shù)read(),因為我想要讀’\n’之前的所有內容。sock.read_some()方法滿足不了這個要求,因為它只會讀可用的,而不是全部的消息。
read()方法的第三個參數(shù)是完成處理句柄。當讀取到完整消息時,它返回0。否則,它會返回我下一步(直到讀取結束)能都到的最大的緩沖區(qū)大小。在我們的例子中,返回結果始終是1,因為我永遠不想讀的消息比我們需要的更多。
在main()中,我們創(chuàng)建了幾個線程;每個線程負責把消息發(fā)送到客戶端,然后等待操作結束。如果你運行這個程序,你會看到下面的輸出:
server echoed our John says hi: OK
server echoed our so does James: OK
server echoed our Lucy just got home: OK
server echoed our Boost.Asio is Fun!: OK
注意:因為我們是同步的,所以不需要調用service.run()。
回顯同步服務端的編寫非常容易,參考如下的代碼片段:
io_service service;
size_t read_complete(char * buff, const error_code & err, size_t bytes) {
if ( err) return 0;
bool found = std::find(buff, buff + bytes, '\n') < buff + bytes;
// 我們一個一個讀取直到讀到回車,不緩存
return found ? 0 : 1;
}
void handle_connections() {
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
char buff[1024];
while ( true) {
ip::tcp::socket sock(service);
acceptor.accept(sock);
int bytes = read(sock, buffer(buff), boost::bind(read_complete,buff,_1,_2));
std::string msg(buff, bytes);
sock.write_some(buffer(msg));
sock.close();
}
}
int main(int argc, char* argv[]) {
handle_connections();
}
服務端的邏輯主要在handle_connections()。因為是單線程,我們接受一個客戶端請求,讀取它發(fā)送給我們的消息,然后回顯,然后等待下一個連接。可以確定,當兩個客戶端同時連接時,第二個客戶端需要等待服務端處理完第一個客戶端的請求。
還是要注意因為我們是同步的,所以不需要調用service.run()。
當我們開始異步時,編碼會變得稍微有點復雜。我們會構建在第二章 保持活動中展示的connection類。
觀察這個章節(jié)中接下來的代碼,你會發(fā)現(xiàn)每個異步操作啟動了新的異步操作,以保持service.run()一直工作。
首先,核心功能如下:
#define MEM_FN(x) boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z)
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr> , boost::noncopyable {
typedef talk_to_svr self_type;
talk_to_svr(const std::string & message) : sock_(service), started_(true), message_(message) {}
void start(ip::tcp::endpoint ep) {
sock_.async_connect(ep, MEM_FN1(on_connect,_1));
}
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_svr> ptr;
static ptr start(ip::tcp::endpoint ep, const std::string &message) {
ptr new_(new talk_to_svr(message));
new_->start(ep);
return new_;
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
bool started() { return started_; }
...
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
std::string message_;
};
我們需要一直使用指向talk_to_svr的智能指針,這樣的話當在tack_to_svr的實例上有異步操作時,那個實例是一直活動的。為了避免錯誤,比如在棧上構建一個talk_to_svr對象的實例時,我把構造方法設置成了私有而且不允許拷貝構造(繼承自boost::noncopyable)。
我們有了核心方法,比如start(),stop()和started(),它們所做的事情也正如它們名字表達的一樣。如果需要建立連接,調用talk_to_svr::start(endpoint, message)即可。我們同時還有一個read緩沖區(qū)和一個write緩沖區(qū)。(readbuufer和writebuffer)。
MEM_FN 是一個方便使用的宏,它們通過shared_ptr_from_this()方法強制使用一個指向 this 的智能指針。
下面的幾行代碼和之前的解釋非常不同:
//等同于 "sock_.async_connect(ep, MEM_FN1(on_connect,_1));"
sock_.async_connect(ep,boost::bind(&talk_to_svr::on_connect,shared_ptr_from_this(),_1));
sock_.async_connect(ep, boost::bind(&talk_to_svr::on_connect,this,_1));
在上述例子中,我們正確的創(chuàng)建了async_connect的完成處理句柄;在調用完成處理句柄之前它會保留一個指向talk_to_server實例的智能指針,從而保證當其發(fā)生時talk_to_server實例還是保持活動的。
在接下來的例子中,我們錯誤地創(chuàng)建了完成處理句柄,當它被調用時,talk_to_server實例很可能已經(jīng)被釋放了。
從socket讀取或寫入時,你使用如下的代碼片段:
void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void do_write(const std::string & msg) {
if ( !started() ) return;
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
// 和TCP客戶端中的類似
}
do_read()方法會保證當on_read()被調用的時候,我們從服務端讀取一行。do_write()方法會先把信息拷貝到緩沖區(qū)(考慮到當async_write發(fā)生時msg可能已經(jīng)超出范圍被釋放),然后保證實際的寫入操作發(fā)生時on_write()被調用。
然后是最重要的方法,這個方法包含了類的主要邏輯:
void on_connect(const error_code & err) {
if ( !err) do_write(message_ + "\n");
else stop();
}
void on_read(const error_code & err, size_t bytes) {
if ( !err) {
std::string copy(read_buffer_, bytes - 1);
std::cout << "server echoed our " << message_ << ": " << (copy == message_ ? "OK" : "FAIL") << std::endl;
}
stop();
}
void on_write(const error_code & err, size_t bytes) {
do_read();
}
當連接成功之后,我們發(fā)送消息到服務端,do_write()。當write操作結束時,on_write()被調用,它初始化了一個do_read()方法,當do_read()完成時。on_read()被調用;這里,我們簡單的檢查一下返回的信息是否是服務端的回顯,然后退出服務。
我們會發(fā)送三個消息到服務端讓它變得更有趣一點:
int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
for ( char ** message = messages; *message; ++message) {
talk_to_svr::start( ep, *message);
boost::this_thread::sleep( boost::posix_time::millisec(100));
}
service.run();
}
上述的代碼會生成如下的輸出:
server echoed our John says hi: OK
server echoed our so does James: OK
server echoed our Lucy just got home: OK
核心功能和同步服務端的功能類似,如下:
class talk_to_client : public boost::enable_shared_from_this<talk_to_
client>, boost::noncopyable {
typedef talk_to_client self_type;
talk_to_client() : sock_(service), started_(false) {}
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_client> ptr;
void start() {
started_ = true;
do_read();
}
static ptr new_() {
ptr new_(new talk_to_client);
return new_;
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
ip::tcp::socket & sock() { return sock_;}
...
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
};
因為我們是非常簡單的回顯服務,這里不需要is_started()方法。對每個客戶端,僅僅讀取它的消息,回顯,然后關閉它。
do_read(),do_write()和read_complete()方法和TCP同步服務端的完全一致。
主要的邏輯同樣是在on_read()和on_write()方法中:
void on_read(const error_code & err, size_t bytes) {
if ( !err) {
std::string msg(read_buffer_, bytes);
do_write(msg + "\n");
}
stop();
}
void on_write(const error_code & err, size_t bytes) {
do_read();
}
對客戶端的處理如下:
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
void handle_accept(talk_to_client::ptr client, const error_code & err)
{
client->start();
talk_to_client::ptr new_client = talk_to_client::new_();
acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1));
}
int main(int argc, char* argv[]) {
talk_to_client::ptr client = talk_to_client::new_();
acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
service.run();
}
每一次客戶端連接到服務時,handle_accept被調用,它會異步地從客戶端讀取,然后同樣異步地等待一個新的客戶端。
你會在這本書相應的代碼中得到所有4個應用(TCP回顯同步客戶端,TCP回顯同步服務端,TCP回顯異步客戶端,TCP回顯異步服務端)。當測試時,你可以使用任意客戶端/服務端組合(比如,一個異步客戶端和一個同步服務端)。
因為UDP不能保證所有信息都抵達接收者,我們不能保證“信息以回車結尾”。 沒收到消息,我們只是回顯,但是沒有socket去關閉(在服務端),因為我們是UDP。
UDP回顯客戶端比TCP回顯客戶端要簡單:
ip::udp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void sync_echo(std::string msg) {
ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 0));
sock.send_to(buffer(msg), ep);
char buff[1024];
ip::udp::endpoint sender_ep;
int bytes = sock.receive_from(buffer(buff), sender_ep);
std::string copy(buff, bytes);
std::cout << "server echoed our " << msg << ": " << (copy == msg ? "OK" : "FAIL") << std::endl;
sock.close();
}
int main(int argc, char* argv[]) {
char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
boost::thread_group threads;
for ( char ** message = messages; *message; ++message) {
threads.create_thread( boost::bind(sync_echo, *message));
boost::this_thread::sleep( boost::posix_time::millisec(100));
}
threads.join_all();
}
所有的邏輯都在synch_echo()中;連接到服務端,發(fā)送消息,接收服務端的回顯,然后關閉連接。
UDP回顯服務端會是你寫過的最簡單的服務端:
io_service service;
void handle_connections() {
char buff[1024];
ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 8001));
while ( true) {
ip::udp::endpoint sender_ep;
int bytes = sock.receive_from(buffer(buff), sender_ep);
std::string msg(buff, bytes);
sock.send_to(buffer(msg), sender_ep);
}
}
int main(int argc, char* argv[]) {
handle_connections();
}
它非常簡單,而且能很好的自釋。
我把異步UDP客戶端和服務端留給讀者當作一個練習。
我們已經(jīng)寫了完整的應用,最終讓Boost.Asio得以工作?;仫@應用是開始學習一個庫時非常好的工具。你可以經(jīng)常學習和運行這個章節(jié)所展示的代碼,這樣你就可以非常容易地記住這個庫的基礎。 在下一章,我們會建立更復雜的客戶端/服務端應用,我們要確保避免低級錯誤,比如內存泄漏,死鎖等等。
更多建議: