[譯]同步VS異步

2018-06-19 15:35 更新

同步VS異步

Boost.Asio的作者做了一個(gè)很驚艷的工作:它可以讓你在同步和異步中自由選擇,從而更好地適應(yīng)你的應(yīng)用。

在之前的章節(jié)中,我們已經(jīng)學(xué)習(xí)了各種類(lèi)型應(yīng)用的框架,比如同步客戶端,同步服務(wù)端,異步客戶端,異步服務(wù)端。它們中的每一個(gè)都可以作為你應(yīng)用的基礎(chǔ)。如果要更加深入地學(xué)習(xí)各種類(lèi)型應(yīng)用的細(xì)節(jié),請(qǐng)繼續(xù)。

混合同步異步編程

Boost.Asio庫(kù)允許你進(jìn)行同步和異步的混合編程。我個(gè)人認(rèn)為這是一個(gè)壞主意,但是Boost.Asio(就像C++一樣)在你需要的時(shí)候允許你深入底層。

通常來(lái)說(shuō),當(dāng)你寫(xiě)一個(gè)異步應(yīng)用時(shí),你會(huì)很容易掉入這個(gè)陷阱。比如在響應(yīng)一個(gè)異步write操作時(shí),你做了一個(gè)同步read操作:

io_service service;
ip::tcp::socket sock(service);
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void on_write(boost::system::error_code err, size_t bytes) {
    char read_buff[512];
    read(sock, buffer(read_buff));
}
async_write(sock, buffer("echo"), on_write);

毫無(wú)疑問(wèn),同步read操作會(huì)阻塞當(dāng)前的線程,從而導(dǎo)致其他任何正在等待的異步操作變成掛起狀態(tài)(對(duì)這個(gè)線程)。這是一段糟糕的代碼,因?yàn)樗鼤?huì)導(dǎo)致整個(gè)應(yīng)用變得無(wú)響應(yīng)或者整個(gè)被阻塞掉(所有異步運(yùn)行的端點(diǎn)都必須避免阻塞,而執(zhí)行一個(gè)同步的操作違反了這個(gè)原則)。

當(dāng)你寫(xiě)一個(gè)同步應(yīng)用時(shí),你不大可能執(zhí)行異步的read或者write操作,因?yàn)橥降厮伎家呀?jīng)意味著用一種線性的方式思考(執(zhí)行A,然后執(zhí)行B,再執(zhí)行C,等等)。

我唯一能想到的同步和異步同時(shí)工作的場(chǎng)景就是同步操作和異步操作是完全隔離的,比如,同步和異步從一個(gè)數(shù)據(jù)庫(kù)進(jìn)行讀寫(xiě)。

從客戶端傳遞信息到服務(wù)端VS從服務(wù)端傳遞信息到客戶端

成功的客戶端/服務(wù)端應(yīng)用一個(gè)很重要的部分就是來(lái)回傳遞消息(服務(wù)端到客戶端和客戶端到服務(wù)端)。你需要指定用什么來(lái)標(biāo)記一個(gè)消息。換句話說(shuō),當(dāng)讀取一個(gè)輸入的消息時(shí),你怎么判斷它被完整讀取了?

標(biāo)記消息結(jié)尾的方式完全取決于你(標(biāo)記消息的開(kāi)始很簡(jiǎn)單,因?yàn)樗褪乔耙粋€(gè)消息之后傳遞過(guò)來(lái)的第一個(gè)字節(jié)),但是要保證消息是簡(jiǎn)單且連續(xù)的。

你可以:

  • 消息大小固定(這不是一個(gè)很好的主意,如果我們需要發(fā)送更多的數(shù)據(jù)怎么辦?)
  • 通過(guò)一個(gè)特殊的字符標(biāo)記消息的結(jié)尾,比如’\n’或者’\0’
  • 在消息的頭部指定消息的大小

我在整本書(shū)中間采用的方式都是“使用’\n’標(biāo)記消息的結(jié)尾”。所以,每次讀取一條消息都會(huì)如下:

char buff_[512];
// 同步讀取
read(sock_, buffer(buff_), boost::bind(&read_complete, this, _1, _2));
// 異步讀取
async_read(sock_, buffer(buff_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
    if ( err) return 0;
    already_read_ = bytes;
    bool found = std::find(buff_, buff_ + bytes, '\n') < buff_ + bytes;
    // 一個(gè)一個(gè)讀,直到讀到回車(chē),無(wú)緩存
    return found ? 0 : 1;
} 

我把在消息頭部指定消息長(zhǎng)度這種方式作為一個(gè)練習(xí)留給讀者;這非常簡(jiǎn)單。

客戶端應(yīng)用中的同步I/O

同步客戶端一般都能歸類(lèi)到如下兩種情況中的一種:

  • 它向服務(wù)端請(qǐng)求一些東西,讀取結(jié)果,然后處理它們。然后請(qǐng)求一些其他的東西,然后一直持續(xù)下去。事實(shí)上,這很像之前章節(jié)里說(shuō)到的同步客戶端。
  • 從服務(wù)端讀取消息,處理它,然后寫(xiě)回結(jié)果。然后讀取另外一條消息,然后一直持續(xù)下去。

這里寫(xiě)圖片描述

兩種情況都使用“發(fā)送請(qǐng)求-讀取結(jié)果”的策略。換句話說(shuō),一個(gè)部分發(fā)送一個(gè)請(qǐng)求到另外一個(gè)部分然后另外一個(gè)部分返回結(jié)果。這是實(shí)現(xiàn)客戶端/服務(wù)端應(yīng)用非常簡(jiǎn)單的一種方式,同時(shí)這也是我非常推薦的一種方式。

你可以創(chuàng)建一個(gè)Mambo Jambo類(lèi)型的客戶端服務(wù)端應(yīng)用,你可以隨心所欲地寫(xiě)它們中間的任何一個(gè)部分,但是這會(huì)導(dǎo)致一場(chǎng)災(zāi)難。(你怎么知道當(dāng)客戶端或者服務(wù)端阻塞的時(shí)候會(huì)發(fā)生什么?)。

上面的情況看上去會(huì)比較相似,但是它們非常不同:

  • 前者,服務(wù)端響應(yīng)請(qǐng)求(服務(wù)端等待來(lái)自客戶端的請(qǐng)求然后回應(yīng))。這是一個(gè)請(qǐng)求式連接,客戶端從服務(wù)端拉取它需要的東西。
  • 后者,服務(wù)端發(fā)送事件到客戶端然后由客戶端響應(yīng)。這是一個(gè)推式連接,服務(wù)端推送通知/事件到客戶端。

你大部分時(shí)間都在做請(qǐng)求式客戶端/服務(wù)端應(yīng)用,這也是比較簡(jiǎn)單,同時(shí)也是比較常見(jiàn)的。

你可以把拉取請(qǐng)求(客戶端到服務(wù)端)和推送請(qǐng)求(服務(wù)端到客戶端)結(jié)合起來(lái),但是,這是非常復(fù)雜的,所以你最好避免這種情況 。把這兩種方式結(jié)合的問(wèn)題在于:如果你使用“發(fā)送請(qǐng)求-讀取結(jié)果”策略。就會(huì)發(fā)生下面一系列事情:

  • 客戶端寫(xiě)入(發(fā)送請(qǐng)求)
  • 服務(wù)端寫(xiě)入(發(fā)送通知到客戶端)
  • 客戶端讀取服務(wù)端寫(xiě)入的內(nèi)容,然后將其作為請(qǐng)求的結(jié)果進(jìn)行解析
  • 服務(wù)端阻塞以等待客戶端的返回的結(jié)果,這會(huì)在客戶端發(fā)送新請(qǐng)求的時(shí)候發(fā)生
  • 服務(wù)端把發(fā)送過(guò)來(lái)的請(qǐng)求當(dāng)作它等待的結(jié)果進(jìn)行解析
  • 客戶端會(huì)阻塞(服務(wù)端不會(huì)返回任何結(jié)果,因?yàn)樗芽蛻舳说恼?qǐng)求當(dāng)作它通知返回的結(jié)果)

在一個(gè)請(qǐng)求式客戶端/服務(wù)端應(yīng)用中,避免上面的情況是非常簡(jiǎn)單的。你可以通過(guò)實(shí)現(xiàn)一個(gè)ping操作的方式來(lái)模擬一個(gè)推送式請(qǐng)求,我們假設(shè)每5秒鐘客戶端ping一次服務(wù)端。如果沒(méi)有事情需要通知,服務(wù)端返回一個(gè)類(lèi)似ping ok的結(jié)果,如果有事情需要通知,服務(wù)端返回一個(gè)ping [event_name]。然后客戶端就可以初始化一個(gè)新的請(qǐng)求去處理這個(gè)事件。

復(fù)習(xí)一下,第一種情況就是之前章節(jié)中的同步客戶端應(yīng)用,它的主循環(huán)如下:

void loop() {
    // 對(duì)于我們登錄操作的結(jié)果
    write("login " + username_ + "\n");
    read_answer();
    while ( started_) {
        write_request();
        read_answer();
        ...
    } 
} 

我們對(duì)其進(jìn)行修改以適應(yīng)第二種情況:

void loop() {
    while ( started_) {
        read_notification();
        write_answer();
    }
}
void read_notification() {
    already_read_ = 0;
    read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2));
    process_notification();
}
void process_notification() {
    // ... 看通知是什么,然后準(zhǔn)備回復(fù)
}

服務(wù)端應(yīng)用中的同步I/O

類(lèi)似客戶端,服務(wù)端也被分為兩種情況用來(lái)匹配之前章節(jié)中的情況1和情況2。同樣,兩種情況都采用“發(fā)送請(qǐng)求-讀取結(jié)果”的策略。

這里寫(xiě)圖片描述

第一種情況是我們?cè)谥罢鹿?jié)實(shí)現(xiàn)過(guò)的同步服務(wù)端。當(dāng)你是同步時(shí)讀取一個(gè)完整的請(qǐng)求不是很簡(jiǎn)單,因?yàn)槟阈枰苊庾枞ㄍǔ?lái)說(shuō)是能讀多少就讀多少):

void read_request() {
    if ( sock_.available())
}
already_read_ += sock_.read_some(buffer(buff_ + already_read_, max_msg - already_read_));

只要一個(gè)消息被完整讀到,就對(duì)它進(jìn)行處理然后回復(fù)給客戶端:

void process_request() {
    bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_;
    if ( !found_enter)
        return; // 消息不完整
    size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_;
    std::string msg(buff_, pos);
    ...
    if ( msg.find("login ") == 0) on_login(msg);
    else if ( msg.find("ping") == 0) on_ping();
    else ...
} 

如果我們想讓服務(wù)端變成一個(gè)推送服務(wù)端,我們通過(guò)如下的方式修改:

typedef std::vector<client_ptr> array;
array clients;
array notify;
std::string notify_msg;
void on_new_client() {
    // 新客戶端連接時(shí),我們通知所有客戶端這個(gè)事件
    notify = clients;
    std::ostringstream msg;
    msg << "client count " << clients.size();
    notify_msg = msg.str();
    notify_clients();
}
void notify_clients() {
    for ( array::const_iterator b = notify.begin(), e = notify.end(); b != e; ++b) {
        (*b)->sock_.write_some(notify_msg);
    }
} 

on_new_client()方法是事件之一,這個(gè)事件我們需要通知所有的客戶端。notify_clients是通知所有對(duì)一個(gè)事件感興趣客戶端的方法。它發(fā)送消息但是不等待每個(gè)客戶端返回的結(jié)果,因?yàn)槟菢拥脑捑蜁?huì)導(dǎo)致阻塞。當(dāng)客戶端返回一個(gè)結(jié)果時(shí),客戶端會(huì)告訴我們它為什么回復(fù)(然后我們就可以正確地處理它)。

同步服務(wù)端中的線程

這是一個(gè)非常重要的關(guān)注點(diǎn):我們開(kāi)辟多少線程去處理服務(wù)端請(qǐng)求? 對(duì)于一個(gè)同步服務(wù)端,我們至少需要一個(gè)處理新連接的線程:

void accept_thread() {
    ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
    while ( true) {
        client_ptr new_( new talk_to_client);
        acceptor.accept(new_->sock());
        boost::recursive_mutex::scoped_lock lk(cs);
        clients.push_back(new_);
    } 
} 

對(duì)于已經(jīng)存在的客戶端:

  • 我們可以是單線程。這是最簡(jiǎn)單的,同時(shí)也是我在第四章 同步服務(wù)端中采用的實(shí)現(xiàn)方式。它可以很輕松地處理100-200并發(fā)的客戶端而且有時(shí)候會(huì)更多,對(duì)于大多數(shù)情況來(lái)說(shuō)這已經(jīng)足夠用了。
  • 我們可以對(duì)每個(gè)客戶端開(kāi)一個(gè)線程。這不是一個(gè)很好的選擇;他會(huì)浪費(fèi)很多線程而且有時(shí)候會(huì)導(dǎo)致調(diào)試?yán)щy,而且當(dāng)它需要處理200以上并發(fā)的客戶端的時(shí)候,它可能馬上會(huì)到達(dá)它的瓶頸。
  • 我們可以用一些固定數(shù)量的線程去處理已經(jīng)存在的客戶端

第三種選擇是同步服務(wù)端中最難實(shí)現(xiàn)的;整個(gè)talk_to_client類(lèi)需要是線程安全的。然后,你需要一個(gè)機(jī)制來(lái)確定哪個(gè)線程處理哪個(gè)客戶端。對(duì)于這個(gè)問(wèn)題,你有兩個(gè)選擇:

  • 將特定的客戶端分配給特定的線程;比如,線程1處理前面20個(gè)客戶端,線程2處理21到40個(gè)線程,等等。當(dāng)一個(gè)線程在使用時(shí)(我們?cè)诘却豢蛻舳俗枞囊恍〇|西),我們從已存在客戶端列表中將其取出來(lái)。等我們處理完之后,再把它放回到列表中。每個(gè)線程都會(huì)循環(huán)遍歷已經(jīng)存在的客戶端列表,然后把擁有完整請(qǐng)求的第一個(gè)客戶端提出來(lái)(我們已經(jīng)從客戶端讀取了一條完整的消息),然后回復(fù)它。
  • 服務(wù)端可能會(huì)變得無(wú)響應(yīng)
    • 第一種情況,被同一個(gè)線程處理的幾個(gè)客戶端同時(shí)發(fā)送請(qǐng)求,因?yàn)橐粋€(gè)線程在同一時(shí)刻只能處理一個(gè)請(qǐng)求。所以這種情況我們什么也不能做。
    • 第二種情況,如果我們發(fā)現(xiàn)并發(fā)請(qǐng)求大于當(dāng)前線程個(gè)數(shù)的時(shí)候。我們可以簡(jiǎn)單地創(chuàng)建新線程來(lái)處理當(dāng)前的壓力。

下面的代碼片段有點(diǎn)類(lèi)似之前的answer_to_client方法,它向我們展示了第二種方法的實(shí)現(xiàn)方式:

struct talk_to_client : boost::enable_shared_from_this<talk_to_client>
{
    ...
    void answer_to_client() {
        try {
            read_request();
            process_request();
        } catch ( boost::system::system_error&) { stop(); }
    } 
}; 

我們需要對(duì)它進(jìn)行修改使它變成下面代碼片段的樣子:

struct talk_to_client : boost::enable_shared_from_this<talk_to_client>
{
    boost::recursive_mutex cs;
    boost::recursive_mutex cs_ask;
    bool in_process;
    void answer_to_client() {
        { boost::recursive_mutex::scoped_lock lk(cs_ask);
            if ( in_process)
                return;
            in_process = true;
        }
        { boost::recursive_mutex::scoped_lock lk(cs);
            try {
                read_request();
                process_request();
            }catch ( boost::system::system_error&) {
                stop();
            }
        }
        { boost::recursive_mutex::scoped_lock lk(cs_ask);
            in_process = false;
        }
    } 
}; 

當(dāng)我們?cè)谔幚硪粋€(gè)客戶端請(qǐng)求的時(shí)候,它的in_process變量被設(shè)置成true,其他的線程就會(huì)忽略這個(gè)客戶端。額外的福利就是handle_clients_thread()方法不需要做任何修改;你可以隨心所欲地創(chuàng)建你想要數(shù)量的handle_clients_thread()方法。

客戶端應(yīng)用中的異步I/O

主流程和同步客戶端應(yīng)用有點(diǎn)類(lèi)似,不同的是Boost.Asio每次都位于async_read和async_write請(qǐng)求中間。

這里寫(xiě)圖片描述

第一種情況是我在第四章 客戶端和服務(wù)端 中實(shí)現(xiàn)過(guò)的。你應(yīng)該還記得在每個(gè)異步操作結(jié)束的時(shí)候,我都啟動(dòng)另外一個(gè)異步操作,這樣service.run()方法才不會(huì)結(jié)束。

為了適應(yīng)第二種情況,你需要使用下面的代碼片段:

void on_connect() {
    do_read();
}
void do_read() {
    async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void on_read(const error_code & err, size_t bytes) {
    if ( err) stop();
    if ( !started() ) return;
    std::string msg(read_buffer_, bytes);
    if ( msg.find("clients") == 0) on_clients(msg);
    else ...
}
void on_clients(const std::string & msg) {
    std::string clients = msg.substr(8);
    std::cout << username_ << ", new client list:" << clients ;
    do_write("clients ok\n");
} 

注意只要我們成功連接上,我們就開(kāi)始從服務(wù)端讀取。每個(gè)on_[event]方法都會(huì)通過(guò)寫(xiě)一個(gè)回復(fù)給服務(wù)端的方式來(lái)結(jié)束我們。

使用異步的美好在于你可以使用Boost.Asio進(jìn)行管理,從而把I/O網(wǎng)絡(luò)操作和其他異步操作結(jié)合起來(lái)。盡管它的流程不像同步的流程那么清晰,你仍然可以用同步的方式來(lái)想象它。

假設(shè),你從一個(gè)web服務(wù)器讀取文件然后把它們保存到一個(gè)數(shù)據(jù)庫(kù)中(異步地)。你可以把這個(gè)過(guò)程想象成下面的流程圖:

這里寫(xiě)圖片描述

服務(wù)端應(yīng)用的異步I/O

現(xiàn)在要展示的是兩個(gè)普遍的情況,情況1(拉?。┖颓闆r2(推送)

這里寫(xiě)圖片描述

第一種情況同樣是我在第4章 客戶端和服務(wù)端 中實(shí)現(xiàn)的異步服務(wù)端。在每一個(gè)異步操作最后,我都會(huì)啟動(dòng)另外一個(gè)異步操作,這樣的話service.run()就不會(huì)結(jié)束。 現(xiàn)在要展示的是被剪裁過(guò)的框架代碼。下面是talk_to_client類(lèi)所有的成員:

void start() {
    ...
    do_read(); // first, we wait for client to login
}
void on_read(const error_code & err, size_t bytes) {
    std::string msg(read_buffer_, bytes);
    if ( msg.find("login ") == 0) on_login(msg);
    else if ( msg.find("ping") == 0) on_ping();
    else
    ...
}
void on_login(const std::string & msg) {
    std::istringstream in(msg);
    in >> username_ >> username_;
    do_write("login ok\n");
}
void do_write(const std::string & msg) {
    std::copy(msg.begin(), msg.end(), write_buffer_);
    sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
void on_write(const error_code & err, size_t bytes) { do_read(); } 

簡(jiǎn)單來(lái)說(shuō),我們始終等待一個(gè)read操作,而且只要一發(fā)生,我們就處理然后將結(jié)果返回給客戶端。

我們把上述代碼進(jìn)行修改就可以完成一個(gè)推送服務(wù)端

void start() {
    ...
    on_new_client_event();
}
void on_new_client_event() {
    std::ostringstream msg;
    msg << "client count " << clients.size();
    for ( array::const_iterator b = clients.begin(), e = clients.end(); (*b)->do_write(msg.str());
} 
void on_read(const error_code & err, size_t bytes) {
    std::string msg(read_buffer_, bytes);
    // 在這里我們基本上只知道我們的客戶端接收到我們的通知
}
void do_write(const std::string & msg) {
    std::copy(msg.begin(), msg.end(), write_buffer_);
    sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
void on_write(const error_code & err, size_t bytes) { do_read(); } 

只要有一個(gè)事件發(fā)生,我們假設(shè)是on_new_client_event,所有需要被通知到的客戶端就都收到一條信息。當(dāng)它們回復(fù)時(shí),我們簡(jiǎn)單認(rèn)為他們已經(jīng)確認(rèn)收到事件。注意我們永遠(yuǎn)不會(huì)把正在等待的異步操作用盡(所以,service.run()不會(huì)結(jié)束),因?yàn)槲覀円恢痹诘却粋€(gè)新的客戶端:

ip::tcp::acceptor acc(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));
void handle_accept(talk_to_client::ptr client, const error_code & err)
{
    client->start();
    talk_to_client::ptr new_client = talk_to_client::new_();
    acc.async_accept(new_client->sock(), bind(handle_accept,new_client,_1));
}

異步服務(wù)端中的多線程

我在第4章 客戶端和服務(wù)端 展示的異步服務(wù)端是單線程的,所有的事情都發(fā)生在main()中:

int main() {
    talk_to_client::ptr client = talk_to_client::new_();
    acc.async_accept(client->sock(), boost::bind(handle_
accept,client,_1));
    service.run();
} 

異步的美妙之處就在于可以非常簡(jiǎn)單地把單線程變?yōu)槎嗑€程。你可以一直保持單線程直到你的并發(fā)客戶端超過(guò)200。然后,你可以使用如下的代碼片段把單線程變成100個(gè)線程:

boost::thread_group threads;
void listen_thread() {
    service.run();
}
void start_listen(int thread_count) {
    for ( int i = 0; i < thread_count; ++i)
        threads.create_thread( listen_thread);
}
int main(int argc, char* argv[]) {
    talk_to_client::ptr client = talk_to_client::new_();
    acc.async_accept(client->sock(), boost::bind(handle_accept,client,_1));
    start_listen(100);
    threads.join_all();
}

當(dāng)然,一旦你選擇了多線程,你需要考慮線程安全。盡管你在線程A中調(diào)用了*async_*,但是它的完成處理流程可以在線程B中被調(diào)用(因?yàn)榫€程B也調(diào)用了service.run())。對(duì)于它本身而言這不是問(wèn)題。只要你遵循邏輯流程,也就是從async_read()on_read(),從on_read()到process_request,從process_requestasync_write(),從async_write()on_write(),從on_write()到async_read(),然后在你的talk_to_client*類(lèi)中也沒(méi)有被調(diào)用的公有方法,這樣的話盡管不同的方法可以在不同的線程中被調(diào)用,它們還是會(huì)被有序地調(diào)用。從而不需要互斥量。

這也意味著對(duì)于一個(gè)客戶端,只會(huì)有一個(gè)異步操作在等待。假如在某些情況,一個(gè)客戶端有兩個(gè)異步方法在等待,你就需要互斥量了。這是因?yàn)閮蓚€(gè)等待的操作可能正好在同一個(gè)時(shí)間完成,然后我們就會(huì)在兩個(gè)不同的線程中間同時(shí)調(diào)用他們的完成處理函數(shù)。所以,這里需要線程安全,也就是需要使用互斥量。 在我們的異步服務(wù)端中,我們確實(shí)同時(shí)有兩個(gè)等待的操作:

void do_read() {
    async_read(sock_, buffer(read_buffer_),MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
    post_check_ping();
}
void post_check_ping() {
    timer_.expires_from_now(boost::posix_time::millisec(5000));
    timer_.async_wait( MEM_FN(on_check_ping));
}

當(dāng)在做一個(gè)read操作時(shí),我們會(huì)異步等待read操作完成和超時(shí)。所以,這里需要線程安全。

我的建議是,如果你準(zhǔn)備使用多線程,從開(kāi)始就保證你的類(lèi)是線程安全的。通常這不會(huì)影響它的性能(當(dāng)然你也可以在配置中設(shè)置開(kāi)關(guān))。同時(shí),如果你準(zhǔn)備使用多線程,從一個(gè)開(kāi)始就使用。這樣的話你能盡早地發(fā)現(xiàn)可能存在的問(wèn)題。一旦你發(fā)現(xiàn)一個(gè)問(wèn)題,你首先需要檢查的事情就是:?jiǎn)尉€程運(yùn)行的時(shí)候是否會(huì)發(fā)生?如果是,它很簡(jiǎn)單;只要調(diào)試它就可以了。否則,你可能忘了對(duì)一些方法加鎖(互斥量)。

因?yàn)槲覀兊睦有枰蔷€程安全的,我已經(jīng)把talk_to_client修改成使用互斥量的了。同時(shí),我們也有一個(gè)客戶端連接的列表,它也需要自己的互斥量,因?yàn)槲覀冇袝r(shí)需要訪問(wèn)它。

避免死鎖和內(nèi)存沖突不是那么容易。下面是我需要對(duì)update_client_changed()方法進(jìn)行修改的地方:

void update_clients_changed() {
    array copy;
    { boost::recursive_mutex::scoped_lock lk(clients_cs); copy = clients; }
    for( array::iterator b = copy.begin(), e = copy.end(); b != e; ++b)
        (*b)->set_clients_changed();
} 

你需要避免的是同時(shí)有兩個(gè)互斥量被鎖定(這會(huì)導(dǎo)致死鎖)。在我們的例子中,我們不想clients_cs和一個(gè)客戶端的cs_互斥量同時(shí)被鎖住

異步操作

Boost.Asio同樣允許你異步地運(yùn)行你任何一個(gè)方法。僅僅需要使用下面的代碼片段:

void my_func() {
    ...
}
service.post(my_func);

這樣就可以保證my_func在調(diào)用了service.run()方法的某個(gè)線程中間被調(diào)用。你同樣可以異步地調(diào)用一個(gè)有完成處理handler的方法,方法的handler會(huì)在方法結(jié)束的時(shí)候通知你。偽代碼如下:

void on_complete() {
    ...
}
void my_func() {
    ...
    service.post(on_complete);
}
async_call(my_func);

沒(méi)有現(xiàn)成的async_call方法,因此,你需要自己創(chuàng)建。幸運(yùn)的是,它不是很復(fù)雜,參考下面的代碼片段:

struct async_op : boost::enable_shared_from_this<async_op>, ... {
    typedef boost::function<void(boost::system::error_code)>completion_func;
    typedef boost::function<boost::system::error_code ()> op_func;
    struct operation { ... };
    void start() {
        { boost::recursive_mutex::scoped_lock lk(cs_);
            if ( started_) return; started_ = true; }
        boost::thread t(boost::bind(&async_op::run,this));
    }
    void add(op_func op, completion_func completion, io_service &service) {
        self_ = shared_from_this();
        boost::recursive_mutex::scoped_lock lk(cs_);
        ops_.push_back( operation(service, op, completion));
        if ( !started_) start();
    } 
    void stop() {
        boost::recursive_mutex::scoped_lock lk(cs_);
        started_ = false; ops_.clear();
    } 
private:
    boost::recursive_mutex cs_;
    std::vector<operation> ops_;
    bool started_;
    ptr self_;
};

async_op方法創(chuàng)建了一個(gè)后臺(tái)線程,這個(gè)線程會(huì)運(yùn)行(run())你添加(add())到它里面的所有的異步操作。為了讓事情簡(jiǎn)單一些,每個(gè)操作都包含下面的內(nèi)容:

  • 一個(gè)異步調(diào)用的方法
  • 當(dāng)?shù)谝粋€(gè)方法結(jié)束時(shí)被調(diào)用的一個(gè)完成處理handler
  • 會(huì)運(yùn)行完成處理handler的io_service實(shí)例。這也是完成時(shí)通知你的地方。參考下面的代碼:

struct async_op : boost::enable_shared_from_this<async_op>, private boost::noncopyable {
    struct operation {
        operation(io_service & service, op_func op, completion_func completion) : service(&service), op(op), completion(completion) , work(new io_service::work(service)) {}
        operation() : service(0) {}
        io_service * service;
        op_func op;
        completion_func completion;
        typedef boost::shared_ptr<io_service::work> work_ptr;
        work_ptr work;
    };
    ... 
}; 

它們被operation結(jié)構(gòu)體包含在內(nèi)部。注意當(dāng)有一個(gè)操作在等待時(shí),我們?cè)诓僮鞯臉?gòu)造方法中構(gòu)造一個(gè)io_service::work實(shí)例,從而保證直到我們完成異步調(diào)用之前service.run()都不會(huì)結(jié)束(當(dāng)io_service::work實(shí)例保持活動(dòng)時(shí),service.run()就會(huì)認(rèn)為它有工作需要做)。參考下面的代碼片段:

struct async_op : ... {
    typedef boost::shared_ptr<async_op> ptr;
    static ptr new_() { return ptr(new async_op); }
    ...
    void run() {
        while ( true) {
            { boost::recursive_mutex::scoped_lock lk(cs_);
                if ( !started_) break; }
            boost::this_thread::sleep(boost::posix_time::millisec(10));
            operation cur;
            { boost::recursive_mutex::scoped_lock lk(cs_);
                if ( !ops_.empty()) {
                    cur = ops_[0]; 
                    ops_.erase(ops_.begin());
                }
            }
            if ( cur.service)
                cur.service->post(boost::bind(cur.completion, cur.op()));        
        }
        self_.reset();
    }
}; 

run()方法就是后臺(tái)線程;它僅僅觀察是否有工作需要做,如果有,就一個(gè)一個(gè)地運(yùn)行這些異步方法。在每個(gè)調(diào)用結(jié)束的時(shí)候,它會(huì)調(diào)用相關(guān)的完成處理方法。

為了測(cè)試,我們創(chuàng)建一個(gè)會(huì)被異步執(zhí)行的compute_file-checksum方法

size_t checksum = 0;
boost::system::error_code compute_file_checksum(std::string file_name)
{
    HANDLE file = ::CreateFile(file_name.c_str(),GENERIC_READ, 0, 0,OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
    windows::random_access_handle h(service, file);
    long buff[1024];
    checksum = 0;
    size_t bytes = 0, at = 0;
    boost::system::error_code ec;
    while ( (bytes = read_at(h, at, buffer(buff), ec)) > 0) {
        at += bytes; bytes /= sizeof(long);
        for ( size_t i = 0; i < bytes; ++i)
            checksum += buff[i];
    }
    return boost::system::error_code(0,boost::system::generic_category());
}
void on_checksum(std::string file_name, boost::system::error_code) {
    std::cout << "checksum for " << file_name << "=" << checksum << std::endl;
}
int main(int argc, char* argv[]) {
    std::string fn = "readme.txt";
    async_op::new_()->add( service, boost::bind(compute_file_checksum,fn),boost::bind(on_checksum,fn,_1));
    service.run();
}

注意我展示給你的只是實(shí)現(xiàn)異步調(diào)用一個(gè)方法的一種可能。除了像我這樣實(shí)現(xiàn)一個(gè)后臺(tái)線程,你可以使用一個(gè)內(nèi)部io_service實(shí)例,然后推送(post())異步方法給這個(gè)實(shí)例調(diào)用。這個(gè)作為一個(gè)練習(xí)留給讀者。

你也可以擴(kuò)展這個(gè)類(lèi)讓其可以展示一個(gè)異步操作的進(jìn)度(比如,使用百分比)。這樣做你就可以在主線程通過(guò)一個(gè)進(jìn)度條來(lái)顯示進(jìn)度。

代理實(shí)現(xiàn)

代理一般位于客戶端和服務(wù)端之間。它接受客戶端的請(qǐng)求,可能會(huì)對(duì)請(qǐng)求進(jìn)行修改,然后接著把請(qǐng)求發(fā)送到服務(wù)端。然后從服務(wù)端取回結(jié)果,可能也會(huì)對(duì)結(jié)果進(jìn)行修改,然后接著把結(jié)果發(fā)送到客戶端。

這里寫(xiě)圖片描述

代理有什么特別的?我們講述它的目的在于:對(duì)每個(gè)連接,你都需要兩個(gè)sokect,一個(gè)給客戶端,另外一個(gè)給服務(wù)端。這些都給實(shí)現(xiàn)一個(gè)代理增加了不小的難度。

實(shí)現(xiàn)一個(gè)同步的代理應(yīng)用比異步的方式更加復(fù)雜;數(shù)據(jù)可能同時(shí)從兩個(gè)端過(guò)來(lái)(客戶端和服務(wù)端),也可能同時(shí)發(fā)往兩個(gè)端。這也就意味著如果我們選擇同步,我們就可能在一端向另一端read()或者write(),同時(shí)另一端向這一端read()或者write()時(shí)阻塞,這也就意味著最終我們會(huì)變得無(wú)響應(yīng)。

根據(jù)下面幾條實(shí)現(xiàn)一個(gè)異步代理的簡(jiǎn)單例子:

  • 在我們的方案中,我們?cè)跇?gòu)造函數(shù)中能拿到兩個(gè)連接。但不是所有的情況都這樣,比如對(duì)于一個(gè)web代理來(lái)說(shuō),客戶端只告訴我們服務(wù)端的地址。
  • 因?yàn)楸容^簡(jiǎn)單,所以不是線程安全的。參考如下的代碼:

class proxy  : public boost::enable_shared_from_this<proxy> {
    proxy(ip::tcp::endpoint ep_client, ip::tcp::endpoint ep_server) : ... {}
public:
    static ptr start(ip::tcp::endpoint ep_client,
ip::tcp::endpoint ep_svr) {
        ptr new_(new proxy(ep_client, ep_svr));
        // … 連接到兩個(gè)端
        return new_;
    }
    void stop() {
        // ... 關(guān)閉兩個(gè)連接
    }
    bool started() { return started_ == 2; }
private:
    void on_connect(const error_code & err) {
        if ( !err)      {
            if ( ++started_ == 2) on_start();
        } else stop();
    }
    void on_start() {
        do_read(client_, buff_client_);
        do_read(server_, buff_server_);
    }
... 
private:
    ip::tcp::socket client_, server_;
    enum { max_msg = 1024 };
    char buff_client_[max_msg], buff_server_[max_msg]; 
    int started_; 
};

這是個(gè)非常簡(jiǎn)單的代理。當(dāng)我們兩個(gè)端都連接時(shí),它開(kāi)始從兩個(gè)端讀?。?em>on_start()方法):

class proxy  : public boost::enable_shared_from_this<proxy> {
    ...
    void on_read(ip::tcp::socket & sock, const error_code& err, size_t bytes) {
        char * buff = &sock == &client_ ? buff_client_ : buff_server_;
        do_write(&sock == &client_ ? server_ : client_, buff, bytes);
    }
    void on_write(ip::tcp::socket & sock, const error_code &err, size_t bytes){
        if ( &sock == &client_) do_read(server_, buff_server_);
        else do_read(client_, buff_client_);
    }
    void do_read(ip::tcp::socket & sock, char* buff) {
        async_read(sock, buffer(buff, max_msg), MEM_FN3(read_complete,ref(sock),_1,_2), MEM_FN3(on_read,ref(sock),_1,_2));
    }
    void do_write(ip::tcp::socket & sock, char * buff, size_t size) {
        sock.async_write_some(buffer(buff,size), MEM_FN3(on_write,ref(sock),_1,_2));
    }
    size_t read_complete(ip::tcp::socket & sock, const error_code & err, size_t bytes) {
        if ( sock.available() > 0) return
        sock.available();
        return bytes > 0 ? 0 : 1;
    }
}; 

對(duì)每一個(gè)成功的讀取操作(on_read),它都會(huì)發(fā)送消息到另外一個(gè)部分。只要消息一發(fā)送成功(on_write),我們就從來(lái)源那部分再次讀取。

使用下面的代碼片段讓這個(gè)流程運(yùn)轉(zhuǎn)起來(lái):

int main(int argc, char* argv[]) {
    ip::tcp::endpoint ep_c(ip::address::from_string("127.0.0.1"),8001);
    ip::tcp::endpoint ep_s(ip::address::from_string("127.0.0.1"),8002);
    proxy::start(ep_c, ep_s);
    service.run();
} 

你會(huì)注意到我在讀和寫(xiě)中重用了buffer。這個(gè)重用是ok的,因?yàn)閺目蛻舳俗x取到的消息在新消息被讀取之前就已經(jīng)寫(xiě)入到服務(wù)端,反之亦然。這也意味著這種特別的實(shí)現(xiàn)方式會(huì)碰到響應(yīng)性的問(wèn)題。當(dāng)我們正在處理到B部分的寫(xiě)入時(shí),我們不會(huì)從A讀?。ㄎ覀儠?huì)在寫(xiě)入到B部分完成時(shí)重新從A部分讀?。?。你可以通過(guò)下面的方式重寫(xiě)實(shí)現(xiàn)來(lái)克服這個(gè)問(wèn)題:

  • 使用多個(gè)讀取buffer
  • 對(duì)每個(gè)成功的read操作,除了異步寫(xiě)回到另外一個(gè)部分,還需要做額外的一個(gè)read(讀取到一個(gè)新的buffer)
  • 對(duì)每個(gè)成功的write操作,銷(xiāo)毀(或者重用)這個(gè)buffer

我會(huì)把這個(gè)當(dāng)作練習(xí)留給你們。

小結(jié)

在選擇同步或者異步時(shí)需要考慮很多事情。最先需要考慮的就是避免混淆它們。

在這一章中,我們已經(jīng)看到:

  • 實(shí)現(xiàn),測(cè)試,調(diào)試各個(gè)類(lèi)型的應(yīng)用是多么簡(jiǎn)單
  • 線程是如何影響你的應(yīng)用的
  • 應(yīng)用的行為是怎么影響它的實(shí)現(xiàn)的(拉取或者推送類(lèi)型)
  • 選擇異步時(shí)怎樣去嵌入自己的異步操作

接下來(lái),我們會(huì)了解一些Boost.Asio不那么為人知曉的特性,中間就有我最喜歡的Boost.Asio特性-協(xié)程,它可以讓你輕松地取異步之精華,去異步之糟粕。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)