在這一章,我們將會(huì)實(shí)現(xiàn)一個(gè)小的客戶端/服務(wù)端應(yīng)用,這可能會(huì)是你寫過的最簡(jiǎn)單的客戶端/服務(wù)端應(yīng)用?;仫@應(yīng)用就是一個(gè)把客戶端發(fā)過來的任何內(nèi)容回顯給其本身,然后關(guān)閉連接的的服務(wù)端。這個(gè)服務(wù)端可以處理任何數(shù)量的客戶端。每個(gè)客戶端連接之后發(fā)送一個(gè)消息,服務(wù)端接收到完成消息后把它發(fā)送回去。在那之后,服務(wù)端關(guān)閉連接。
因此,每個(gè)回顯客戶端連接到服務(wù)端,發(fā)送一個(gè)消息,然后讀取服務(wù)端返回的結(jié)果,確保這是它發(fā)送給服務(wù)端的消息就結(jié)束和服務(wù)端的會(huì)話。
我們首先實(shí)現(xiàn)一個(gè)同步應(yīng)用,然后實(shí)現(xiàn)一個(gè)異步應(yīng)用,以便你可以很容易對(duì)比他們:
為了節(jié)省空間,下面的代碼有一些被裁剪掉了。你可以在附加在這本書的代碼中看到全部的代碼。
對(duì)于TCP而言,我們需要一個(gè)額外的保證;每一個(gè)消息以換行符結(jié)束(‘\n’)。編寫一個(gè)同步回顯服務(wù)端/客戶端非常簡(jiǎn)單。
我們會(huì)展示編碼內(nèi)容,比如同步客戶端,同步服務(wù)端,異步客戶端和異步服務(wù)端。
在大多數(shù)有價(jià)值的例子中,客戶端通常比服務(wù)端編碼要簡(jiǎn)單(因?yàn)榉?wù)端需要處理多個(gè)客戶端請(qǐng)求)。
下面的代碼展示了不符合這條規(guī)則的一個(gè)例外:
size_t read_complete(char * buf, const error_code & err, size_t bytes)
{
if ( err) return 0;
bool found = std::find(buf, buf + bytes, '\n') < buf + bytes;
// 我們一個(gè)一個(gè)讀取直到讀到回車,不緩存
return found ? 0 : 1;
}
void sync_echo(std::string msg) {
msg += "\n”;
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer(msg));
char buf[1024];
int bytes = read(sock, buffer(buf), boost::bind(read_complete,buf,_1,_2));
std::string copy(buf, bytes - 1);
msg = msg.substr(0, msg.size() - 1);
std::cout << "server echoed our " << msg << ": "<< (copy == msg ? "OK" : "FAIL") << std::endl;
sock.close();
}
int main(int argc, char* argv[]) {
char* messages[] = { "John says hi", "so does James", "Lucy just got home", "Boost.Asio is Fun!", 0 };
boost::thread_group threads;
for ( char ** message = messages; *message; ++message) {
threads.create_thread( boost::bind(sync_echo, *message));
boost::this_thread::sleep( boost::posix_time::millisec(100));
}
threads.join_all();
}
核心功能sync_echo。它包含了連接到服務(wù)端,發(fā)送信息然后等待回顯的所有邏輯。
你會(huì)發(fā)現(xiàn),在讀取時(shí),我使用了自由函數(shù)read(),因?yàn)槲蚁胍x’\n’之前的所有內(nèi)容。sock.read_some()方法滿足不了這個(gè)要求,因?yàn)樗粫?huì)讀可用的,而不是全部的消息。
read()方法的第三個(gè)參數(shù)是完成處理句柄。當(dāng)讀取到完整消息時(shí),它返回0。否則,它會(huì)返回我下一步(直到讀取結(jié)束)能都到的最大的緩沖區(qū)大小。在我們的例子中,返回結(jié)果始終是1,因?yàn)槲矣肋h(yuǎn)不想讀的消息比我們需要的更多。
在main()中,我們創(chuàng)建了幾個(gè)線程;每個(gè)線程負(fù)責(zé)把消息發(fā)送到客戶端,然后等待操作結(jié)束。如果你運(yùn)行這個(gè)程序,你會(huì)看到下面的輸出:
server echoed our John says hi: OK
server echoed our so does James: OK
server echoed our Lucy just got home: OK
server echoed our Boost.Asio is Fun!: OK
注意:因?yàn)槲覀兪峭降?,所以不需要調(diào)用service.run()。
回顯同步服務(wù)端的編寫非常容易,參考如下的代碼片段:
io_service service;
size_t read_complete(char * buff, const error_code & err, size_t bytes) {
if ( err) return 0;
bool found = std::find(buff, buff + bytes, '\n') < buff + bytes;
// 我們一個(gè)一個(gè)讀取直到讀到回車,不緩存
return found ? 0 : 1;
}
void handle_connections() {
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
char buff[1024];
while ( true) {
ip::tcp::socket sock(service);
acceptor.accept(sock);
int bytes = read(sock, buffer(buff), boost::bind(read_complete,buff,_1,_2));
std::string msg(buff, bytes);
sock.write_some(buffer(msg));
sock.close();
}
}
int main(int argc, char* argv[]) {
handle_connections();
}
服務(wù)端的邏輯主要在handle_connections()。因?yàn)槭菃尉€程,我們接受一個(gè)客戶端請(qǐng)求,讀取它發(fā)送給我們的消息,然后回顯,然后等待下一個(gè)連接??梢源_定,當(dāng)兩個(gè)客戶端同時(shí)連接時(shí),第二個(gè)客戶端需要等待服務(wù)端處理完第一個(gè)客戶端的請(qǐng)求。
還是要注意因?yàn)槲覀兪峭降?,所以不需要調(diào)用service.run()。
當(dāng)我們開始異步時(shí),編碼會(huì)變得稍微有點(diǎn)復(fù)雜。我們會(huì)構(gòu)建在第二章 保持活動(dòng)中展示的connection類。
觀察這個(gè)章節(jié)中接下來的代碼,你會(huì)發(fā)現(xiàn)每個(gè)異步操作啟動(dòng)了新的異步操作,以保持service.run()一直工作。
首先,核心功能如下:
#define MEM_FN(x) boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_this(),y,z)
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr> , boost::noncopyable {
typedef talk_to_svr self_type;
talk_to_svr(const std::string & message) : sock_(service), started_(true), message_(message) {}
void start(ip::tcp::endpoint ep) {
sock_.async_connect(ep, MEM_FN1(on_connect,_1));
}
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_svr> ptr;
static ptr start(ip::tcp::endpoint ep, const std::string &message) {
ptr new_(new talk_to_svr(message));
new_->start(ep);
return new_;
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
bool started() { return started_; }
...
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
std::string message_;
};
我們需要一直使用指向talk_to_svr的智能指針,這樣的話當(dāng)在tack_to_svr的實(shí)例上有異步操作時(shí),那個(gè)實(shí)例是一直活動(dòng)的。為了避免錯(cuò)誤,比如在棧上構(gòu)建一個(gè)talk_to_svr對(duì)象的實(shí)例時(shí),我把構(gòu)造方法設(shè)置成了私有而且不允許拷貝構(gòu)造(繼承自boost::noncopyable)。
我們有了核心方法,比如start(),stop()和started(),它們所做的事情也正如它們名字表達(dá)的一樣。如果需要建立連接,調(diào)用talk_to_svr::start(endpoint, message)即可。我們同時(shí)還有一個(gè)read緩沖區(qū)和一個(gè)write緩沖區(qū)。(readbuufer和writebuffer)。
MEM_FN 是一個(gè)方便使用的宏,它們通過shared_ptr_from_this()方法強(qiáng)制使用一個(gè)指向 this 的智能指針。
下面的幾行代碼和之前的解釋非常不同:
//等同于 "sock_.async_connect(ep, MEM_FN1(on_connect,_1));"
sock_.async_connect(ep,boost::bind(&talk_to_svr::on_connect,shared_ptr_from_this(),_1));
sock_.async_connect(ep, boost::bind(&talk_to_svr::on_connect,this,_1));
在上述例子中,我們正確的創(chuàng)建了async_connect的完成處理句柄;在調(diào)用完成處理句柄之前它會(huì)保留一個(gè)指向talk_to_server實(shí)例的智能指針,從而保證當(dāng)其發(fā)生時(shí)talk_to_server實(shí)例還是保持活動(dòng)的。
在接下來的例子中,我們錯(cuò)誤地創(chuàng)建了完成處理句柄,當(dāng)它被調(diào)用時(shí),talk_to_server實(shí)例很可能已經(jīng)被釋放了。
從socket讀取或?qū)懭霑r(shí),你使用如下的代碼片段:
void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void do_write(const std::string & msg) {
if ( !started() ) return;
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
// 和TCP客戶端中的類似
}
do_read()方法會(huì)保證當(dāng)on_read()被調(diào)用的時(shí)候,我們從服務(wù)端讀取一行。do_write()方法會(huì)先把信息拷貝到緩沖區(qū)(考慮到當(dāng)async_write發(fā)生時(shí)msg可能已經(jīng)超出范圍被釋放),然后保證實(shí)際的寫入操作發(fā)生時(shí)on_write()被調(diào)用。
然后是最重要的方法,這個(gè)方法包含了類的主要邏輯:
void on_connect(const error_code & err) {
if ( !err) do_write(message_ + "\n");
else stop();
}
void on_read(const error_code & err, size_t bytes) {
if ( !err) {
std::string copy(read_buffer_, bytes - 1);
std::cout << "server echoed our " << message_ << ": " << (copy == message_ ? "OK" : "FAIL") << std::endl;
}
stop();
}
void on_write(const error_code & err, size_t bytes) {
do_read();
}
當(dāng)連接成功之后,我們發(fā)送消息到服務(wù)端,do_write()。當(dāng)write操作結(jié)束時(shí),on_write()被調(diào)用,它初始化了一個(gè)do_read()方法,當(dāng)do_read()完成時(shí)。on_read()被調(diào)用;這里,我們簡(jiǎn)單的檢查一下返回的信息是否是服務(wù)端的回顯,然后退出服務(wù)。
我們會(huì)發(fā)送三個(gè)消息到服務(wù)端讓它變得更有趣一點(diǎn):
int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
for ( char ** message = messages; *message; ++message) {
talk_to_svr::start( ep, *message);
boost::this_thread::sleep( boost::posix_time::millisec(100));
}
service.run();
}
上述的代碼會(huì)生成如下的輸出:
server echoed our John says hi: OK
server echoed our so does James: OK
server echoed our Lucy just got home: OK
核心功能和同步服務(wù)端的功能類似,如下:
class talk_to_client : public boost::enable_shared_from_this<talk_to_
client>, boost::noncopyable {
typedef talk_to_client self_type;
talk_to_client() : sock_(service), started_(false) {}
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_client> ptr;
void start() {
started_ = true;
do_read();
}
static ptr new_() {
ptr new_(new talk_to_client);
return new_;
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
ip::tcp::socket & sock() { return sock_;}
...
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
};
因?yàn)槲覀兪欠浅:?jiǎn)單的回顯服務(wù),這里不需要is_started()方法。對(duì)每個(gè)客戶端,僅僅讀取它的消息,回顯,然后關(guān)閉它。
do_read(),do_write()和read_complete()方法和TCP同步服務(wù)端的完全一致。
主要的邏輯同樣是在on_read()和on_write()方法中:
void on_read(const error_code & err, size_t bytes) {
if ( !err) {
std::string msg(read_buffer_, bytes);
do_write(msg + "\n");
}
stop();
}
void on_write(const error_code & err, size_t bytes) {
do_read();
}
對(duì)客戶端的處理如下:
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
void handle_accept(talk_to_client::ptr client, const error_code & err)
{
client->start();
talk_to_client::ptr new_client = talk_to_client::new_();
acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1));
}
int main(int argc, char* argv[]) {
talk_to_client::ptr client = talk_to_client::new_();
acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
service.run();
}
每一次客戶端連接到服務(wù)時(shí),handle_accept被調(diào)用,它會(huì)異步地從客戶端讀取,然后同樣異步地等待一個(gè)新的客戶端。
你會(huì)在這本書相應(yīng)的代碼中得到所有4個(gè)應(yīng)用(TCP回顯同步客戶端,TCP回顯同步服務(wù)端,TCP回顯異步客戶端,TCP回顯異步服務(wù)端)。當(dāng)測(cè)試時(shí),你可以使用任意客戶端/服務(wù)端組合(比如,一個(gè)異步客戶端和一個(gè)同步服務(wù)端)。
因?yàn)閁DP不能保證所有信息都抵達(dá)接收者,我們不能保證“信息以回車結(jié)尾”。 沒收到消息,我們只是回顯,但是沒有socket去關(guān)閉(在服務(wù)端),因?yàn)槲覀兪荱DP。
UDP回顯客戶端比TCP回顯客戶端要簡(jiǎn)單:
ip::udp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void sync_echo(std::string msg) {
ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 0));
sock.send_to(buffer(msg), ep);
char buff[1024];
ip::udp::endpoint sender_ep;
int bytes = sock.receive_from(buffer(buff), sender_ep);
std::string copy(buff, bytes);
std::cout << "server echoed our " << msg << ": " << (copy == msg ? "OK" : "FAIL") << std::endl;
sock.close();
}
int main(int argc, char* argv[]) {
char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
boost::thread_group threads;
for ( char ** message = messages; *message; ++message) {
threads.create_thread( boost::bind(sync_echo, *message));
boost::this_thread::sleep( boost::posix_time::millisec(100));
}
threads.join_all();
}
所有的邏輯都在synch_echo()中;連接到服務(wù)端,發(fā)送消息,接收服務(wù)端的回顯,然后關(guān)閉連接。
UDP回顯服務(wù)端會(huì)是你寫過的最簡(jiǎn)單的服務(wù)端:
io_service service;
void handle_connections() {
char buff[1024];
ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 8001));
while ( true) {
ip::udp::endpoint sender_ep;
int bytes = sock.receive_from(buffer(buff), sender_ep);
std::string msg(buff, bytes);
sock.send_to(buffer(msg), sender_ep);
}
}
int main(int argc, char* argv[]) {
handle_connections();
}
它非常簡(jiǎn)單,而且能很好的自釋。
我把異步UDP客戶端和服務(wù)端留給讀者當(dāng)作一個(gè)練習(xí)。
我們已經(jīng)寫了完整的應(yīng)用,最終讓Boost.Asio得以工作?;仫@應(yīng)用是開始學(xué)習(xí)一個(gè)庫時(shí)非常好的工具。你可以經(jīng)常學(xué)習(xí)和運(yùn)行這個(gè)章節(jié)所展示的代碼,這樣你就可以非常容易地記住這個(gè)庫的基礎(chǔ)。 在下一章,我們會(huì)建立更復(fù)雜的客戶端/服務(wù)端應(yīng)用,我們要確保避免低級(jí)錯(cuò)誤,比如內(nèi)存泄漏,死鎖等等。
更多建議: