[譯]回顯服務(wù)端/客戶端

2018-06-19 15:34 更新

回顯服務(wù)端/客戶端

在這一章,我們將會(huì)實(shí)現(xiàn)一個(gè)小的客戶端/服務(wù)端應(yīng)用,這可能會(huì)是你寫過的最簡(jiǎn)單的客戶端/服務(wù)端應(yīng)用?;仫@應(yīng)用就是一個(gè)把客戶端發(fā)過來的任何內(nèi)容回顯給其本身,然后關(guān)閉連接的的服務(wù)端。這個(gè)服務(wù)端可以處理任何數(shù)量的客戶端。每個(gè)客戶端連接之后發(fā)送一個(gè)消息,服務(wù)端接收到完成消息后把它發(fā)送回去。在那之后,服務(wù)端關(guān)閉連接。

因此,每個(gè)回顯客戶端連接到服務(wù)端,發(fā)送一個(gè)消息,然后讀取服務(wù)端返回的結(jié)果,確保這是它發(fā)送給服務(wù)端的消息就結(jié)束和服務(wù)端的會(huì)話。

我們首先實(shí)現(xiàn)一個(gè)同步應(yīng)用,然后實(shí)現(xiàn)一個(gè)異步應(yīng)用,以便你可以很容易對(duì)比他們:

這里寫圖片描述

為了節(jié)省空間,下面的代碼有一些被裁剪掉了。你可以在附加在這本書的代碼中看到全部的代碼。

TCP回顯服務(wù)端/客戶端

對(duì)于TCP而言,我們需要一個(gè)額外的保證;每一個(gè)消息以換行符結(jié)束(‘\n’)。編寫一個(gè)同步回顯服務(wù)端/客戶端非常簡(jiǎn)單。

我們會(huì)展示編碼內(nèi)容,比如同步客戶端,同步服務(wù)端,異步客戶端和異步服務(wù)端。

TCP同步客戶端

在大多數(shù)有價(jià)值的例子中,客戶端通常比服務(wù)端編碼要簡(jiǎn)單(因?yàn)榉?wù)端需要處理多個(gè)客戶端請(qǐng)求)。

下面的代碼展示了不符合這條規(guī)則的一個(gè)例外:

size_t read_complete(char * buf, const error_code & err, size_t bytes)
{
    if ( err) return 0;
    bool found = std::find(buf, buf + bytes, '\n') < buf + bytes;
    // 我們一個(gè)一個(gè)讀取直到讀到回車,不緩存
    return found ? 0 : 1;
}
void sync_echo(std::string msg) {
    msg += "\n”;
    ip::tcp::socket sock(service);
    sock.connect(ep);
    sock.write_some(buffer(msg));
    char buf[1024];
    int bytes = read(sock, buffer(buf), boost::bind(read_complete,buf,_1,_2));
    std::string copy(buf, bytes - 1);
    msg = msg.substr(0, msg.size() - 1);
    std::cout << "server echoed our " << msg << ": "<< (copy == msg ? "OK" : "FAIL") << std::endl;
    sock.close();
}
int main(int argc, char* argv[]) {
    char* messages[] = { "John says hi", "so does James", "Lucy just got home", "Boost.Asio is Fun!", 0 };
    boost::thread_group threads;
    for ( char ** message = messages; *message; ++message) {
        threads.create_thread( boost::bind(sync_echo, *message));
        boost::this_thread::sleep( boost::posix_time::millisec(100));
    }
    threads.join_all();
}

核心功能sync_echo。它包含了連接到服務(wù)端,發(fā)送信息然后等待回顯的所有邏輯。

你會(huì)發(fā)現(xiàn),在讀取時(shí),我使用了自由函數(shù)read(),因?yàn)槲蚁胍x’\n’之前的所有內(nèi)容。sock.read_some()方法滿足不了這個(gè)要求,因?yàn)樗粫?huì)讀可用的,而不是全部的消息。

read()方法的第三個(gè)參數(shù)是完成處理句柄。當(dāng)讀取到完整消息時(shí),它返回0。否則,它會(huì)返回我下一步(直到讀取結(jié)束)能都到的最大的緩沖區(qū)大小。在我們的例子中,返回結(jié)果始終是1,因?yàn)槲矣肋h(yuǎn)不想讀的消息比我們需要的更多。

main()中,我們創(chuàng)建了幾個(gè)線程;每個(gè)線程負(fù)責(zé)把消息發(fā)送到客戶端,然后等待操作結(jié)束。如果你運(yùn)行這個(gè)程序,你會(huì)看到下面的輸出:

server echoed our John says hi: OK
server echoed our so does James: OK
server echoed our Lucy just got home: OK
server echoed our Boost.Asio is Fun!: OK

注意:因?yàn)槲覀兪峭降?,所以不需要調(diào)用service.run()。

TCP同步服務(wù)端

回顯同步服務(wù)端的編寫非常容易,參考如下的代碼片段:

io_service service;
size_t read_complete(char * buff, const error_code & err, size_t bytes) {
    if ( err) return 0;
    bool found = std::find(buff, buff + bytes, '\n') < buff + bytes;
    // 我們一個(gè)一個(gè)讀取直到讀到回車,不緩存
    return found ? 0 : 1;
}
void handle_connections() {
    ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
    char buff[1024];
    while ( true) {
        ip::tcp::socket sock(service);
        acceptor.accept(sock);
        int bytes = read(sock, buffer(buff), boost::bind(read_complete,buff,_1,_2));
        std::string msg(buff, bytes);
        sock.write_some(buffer(msg));
        sock.close();
    }
}
int main(int argc, char* argv[]) {
    handle_connections();
}

服務(wù)端的邏輯主要在handle_connections()。因?yàn)槭菃尉€程,我們接受一個(gè)客戶端請(qǐng)求,讀取它發(fā)送給我們的消息,然后回顯,然后等待下一個(gè)連接??梢源_定,當(dāng)兩個(gè)客戶端同時(shí)連接時(shí),第二個(gè)客戶端需要等待服務(wù)端處理完第一個(gè)客戶端的請(qǐng)求。

還是要注意因?yàn)槲覀兪峭降?,所以不需要調(diào)用service.run()。

TCP異步客戶端

當(dāng)我們開始異步時(shí),編碼會(huì)變得稍微有點(diǎn)復(fù)雜。我們會(huì)構(gòu)建在第二章 保持活動(dòng)中展示的connection類。

觀察這個(gè)章節(jié)中接下來的代碼,你會(huì)發(fā)現(xiàn)每個(gè)異步操作啟動(dòng)了新的異步操作,以保持service.run()一直工作。

首先,核心功能如下:

#define MEM_FN(x)       boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y)    boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z)  boost::bind(&self_type::x, shared_from_this(),y,z)
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr> , boost::noncopyable {
    typedef talk_to_svr self_type;
    talk_to_svr(const std::string & message) : sock_(service), started_(true), message_(message) {}
    void start(ip::tcp::endpoint ep) {
        sock_.async_connect(ep, MEM_FN1(on_connect,_1));
    }
public:
    typedef boost::system::error_code error_code;
    typedef boost::shared_ptr<talk_to_svr> ptr;
    static ptr start(ip::tcp::endpoint ep, const std::string &message) {
        ptr new_(new talk_to_svr(message));
        new_->start(ep);
        return new_;
    }
    void stop() {
        if ( !started_) return;
        started_ = false;
        sock_.close();
    }
    bool started() { return started_; }
    ...
private:
    ip::tcp::socket sock_;
    enum { max_msg = 1024 };
    char read_buffer_[max_msg];
    char write_buffer_[max_msg];
    bool started_;
    std::string message_; 
}; 

我們需要一直使用指向talk_to_svr的智能指針,這樣的話當(dāng)在tack_to_svr的實(shí)例上有異步操作時(shí),那個(gè)實(shí)例是一直活動(dòng)的。為了避免錯(cuò)誤,比如在棧上構(gòu)建一個(gè)talk_to_svr對(duì)象的實(shí)例時(shí),我把構(gòu)造方法設(shè)置成了私有而且不允許拷貝構(gòu)造(繼承自boost::noncopyable)。

我們有了核心方法,比如start(),stop()started(),它們所做的事情也正如它們名字表達(dá)的一樣。如果需要建立連接,調(diào)用talk_to_svr::start(endpoint, message)即可。我們同時(shí)還有一個(gè)read緩沖區(qū)和一個(gè)write緩沖區(qū)。(readbuuferwritebuffer)。

MEM_FN 是一個(gè)方便使用的宏,它們通過shared_ptr_from_this()方法強(qiáng)制使用一個(gè)指向 this 的智能指針。

下面的幾行代碼和之前的解釋非常不同:

//等同于 "sock_.async_connect(ep, MEM_FN1(on_connect,_1));"
sock_.async_connect(ep,boost::bind(&talk_to_svr::on_connect,shared_ptr_from_this(),_1));
sock_.async_connect(ep, boost::bind(&talk_to_svr::on_connect,this,_1));

在上述例子中,我們正確的創(chuàng)建了async_connect的完成處理句柄;在調(diào)用完成處理句柄之前它會(huì)保留一個(gè)指向talk_to_server實(shí)例的智能指針,從而保證當(dāng)其發(fā)生時(shí)talk_to_server實(shí)例還是保持活動(dòng)的。

在接下來的例子中,我們錯(cuò)誤地創(chuàng)建了完成處理句柄,當(dāng)它被調(diào)用時(shí),talk_to_server實(shí)例很可能已經(jīng)被釋放了。

從socket讀取或?qū)懭霑r(shí),你使用如下的代碼片段:

void do_read() {
    async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void do_write(const std::string & msg) {
    if ( !started() ) return;
    std::copy(msg.begin(), msg.end(), write_buffer_);
    sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
    // 和TCP客戶端中的類似
}

do_read()方法會(huì)保證當(dāng)on_read()被調(diào)用的時(shí)候,我們從服務(wù)端讀取一行。do_write()方法會(huì)先把信息拷貝到緩沖區(qū)(考慮到當(dāng)async_write發(fā)生時(shí)msg可能已經(jīng)超出范圍被釋放),然后保證實(shí)際的寫入操作發(fā)生時(shí)on_write()被調(diào)用。

然后是最重要的方法,這個(gè)方法包含了類的主要邏輯:

void on_connect(const error_code & err) {
    if ( !err)      do_write(message_ + "\n");
    else            stop();
}
void on_read(const error_code & err, size_t bytes) {
    if ( !err) {
        std::string copy(read_buffer_, bytes - 1);
        std::cout << "server echoed our " << message_ << ": " << (copy == message_ ? "OK" : "FAIL") << std::endl; 
    }
    stop(); 
}
void on_write(const error_code & err, size_t bytes) {
    do_read();
} 

當(dāng)連接成功之后,我們發(fā)送消息到服務(wù)端,do_write()。當(dāng)write操作結(jié)束時(shí),on_write()被調(diào)用,它初始化了一個(gè)do_read()方法,當(dāng)do_read()完成時(shí)。on_read()被調(diào)用;這里,我們簡(jiǎn)單的檢查一下返回的信息是否是服務(wù)端的回顯,然后退出服務(wù)。

我們會(huì)發(fā)送三個(gè)消息到服務(wù)端讓它變得更有趣一點(diǎn):

int main(int argc, char* argv[]) {
    ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
    char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
    for ( char ** message = messages; *message; ++message) {
        talk_to_svr::start( ep, *message);
        boost::this_thread::sleep( boost::posix_time::millisec(100));
    }
    service.run();
}

上述的代碼會(huì)生成如下的輸出:

server echoed our John says hi: OK
server echoed our so does James: OK
server echoed our Lucy just got home: OK

TCP異步服務(wù)端

核心功能和同步服務(wù)端的功能類似,如下:

class talk_to_client : public boost::enable_shared_from_this<talk_to_
   client>, boost::noncopyable {
    typedef talk_to_client self_type;
    talk_to_client() : sock_(service), started_(false) {}
public:
    typedef boost::system::error_code error_code;
    typedef boost::shared_ptr<talk_to_client> ptr;
    void start() {
        started_ = true;
        do_read(); 
    }


    static ptr new_() {
        ptr new_(new talk_to_client);
        return new_;
    }
    void stop() {
        if ( !started_) return;
        started_ = false;
        sock_.close();
    }
    ip::tcp::socket & sock() { return sock_;}
    ...
private:
    ip::tcp::socket sock_;
    enum { max_msg = 1024 };
    char read_buffer_[max_msg];
    char write_buffer_[max_msg];
    bool started_;
};

因?yàn)槲覀兪欠浅:?jiǎn)單的回顯服務(wù),這里不需要is_started()方法。對(duì)每個(gè)客戶端,僅僅讀取它的消息,回顯,然后關(guān)閉它。

do_read(),do_write()read_complete()方法和TCP同步服務(wù)端的完全一致。

主要的邏輯同樣是在on_read()on_write()方法中:

void on_read(const error_code & err, size_t bytes) {
    if ( !err) {
        std::string msg(read_buffer_, bytes);
        do_write(msg + "\n");
    }
    stop(); 
}
void on_write(const error_code & err, size_t bytes) {
    do_read();
}

對(duì)客戶端的處理如下:

ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
void handle_accept(talk_to_client::ptr client, const error_code & err)
{
    client->start();
    talk_to_client::ptr new_client = talk_to_client::new_();
    acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1));
}
int main(int argc, char* argv[]) {
    talk_to_client::ptr client = talk_to_client::new_();
    acceptor.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
    service.run();
} 

每一次客戶端連接到服務(wù)時(shí),handle_accept被調(diào)用,它會(huì)異步地從客戶端讀取,然后同樣異步地等待一個(gè)新的客戶端。

代碼

你會(huì)在這本書相應(yīng)的代碼中得到所有4個(gè)應(yīng)用(TCP回顯同步客戶端,TCP回顯同步服務(wù)端,TCP回顯異步客戶端,TCP回顯異步服務(wù)端)。當(dāng)測(cè)試時(shí),你可以使用任意客戶端/服務(wù)端組合(比如,一個(gè)異步客戶端和一個(gè)同步服務(wù)端)。

UDP回顯服務(wù)端/客戶端

因?yàn)閁DP不能保證所有信息都抵達(dá)接收者,我們不能保證“信息以回車結(jié)尾”。 沒收到消息,我們只是回顯,但是沒有socket去關(guān)閉(在服務(wù)端),因?yàn)槲覀兪荱DP。

UDP同步回顯客戶端

UDP回顯客戶端比TCP回顯客戶端要簡(jiǎn)單:

ip::udp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void sync_echo(std::string msg) {
    ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 0));
    sock.send_to(buffer(msg), ep);
    char buff[1024];
    ip::udp::endpoint sender_ep;
    int bytes = sock.receive_from(buffer(buff), sender_ep);
    std::string copy(buff, bytes);
    std::cout << "server echoed our " << msg << ": " << (copy == msg ? "OK" : "FAIL") << std::endl;
    sock.close();
}
int main(int argc, char* argv[]) {
    char* messages[] = { "John says hi", "so does James", "Lucy got home", 0 };
    boost::thread_group threads;
    for ( char ** message = messages; *message; ++message) {
        threads.create_thread( boost::bind(sync_echo, *message));
        boost::this_thread::sleep( boost::posix_time::millisec(100));
    }
    threads.join_all();
}

所有的邏輯都在synch_echo()中;連接到服務(wù)端,發(fā)送消息,接收服務(wù)端的回顯,然后關(guān)閉連接。

UDP同步回顯服務(wù)端

UDP回顯服務(wù)端會(huì)是你寫過的最簡(jiǎn)單的服務(wù)端:

io_service service;
void handle_connections() {
    char buff[1024];
    ip::udp::socket sock(service, ip::udp::endpoint(ip::udp::v4(), 8001));
    while ( true) {
        ip::udp::endpoint sender_ep;
        int bytes = sock.receive_from(buffer(buff), sender_ep);
        std::string msg(buff, bytes);
        sock.send_to(buffer(msg), sender_ep);
    } 
}
int main(int argc, char* argv[]) {
    handle_connections();
} 

它非常簡(jiǎn)單,而且能很好的自釋。

我把異步UDP客戶端和服務(wù)端留給讀者當(dāng)作一個(gè)練習(xí)。

總結(jié)

我們已經(jīng)寫了完整的應(yīng)用,最終讓Boost.Asio得以工作?;仫@應(yīng)用是開始學(xué)習(xí)一個(gè)庫時(shí)非常好的工具。你可以經(jīng)常學(xué)習(xí)和運(yùn)行這個(gè)章節(jié)所展示的代碼,這樣你就可以非常容易地記住這個(gè)庫的基礎(chǔ)。 在下一章,我們會(huì)建立更復(fù)雜的客戶端/服務(wù)端應(yīng)用,我們要確保避免低級(jí)錯(cuò)誤,比如內(nèi)存泄漏,死鎖等等。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)