boost Strands Internal

我對boost.asio的使用不深入,所以總是過了一段時(shí)間就會(huì)對它的理解上又變得模糊起來。
還是先回頭看看boost.asio的一些基礎(chǔ)知識(shí)。

ASIO攻破?。。?/a>這篇文章有一個(gè)蠻不錯(cuò)的比喻:

其實(shí)io_service就像是勞工中介所,而一個(gè)線程就是一個(gè)勞工,調(diào)用post的模塊相當(dāng)于富人們,他們?nèi)ブ薪樗腥蝿?wù),而勞工們就聽候中介所的調(diào)遣去執(zhí)行這些任務(wù),任務(wù)的內(nèi)容就寫在富人們給你的handler上,也就是函數(shù)指針,指針指向具體實(shí)現(xiàn)就是任務(wù)的實(shí)質(zhì)內(nèi)容。其實(shí)在整個(gè)過程中,富人們都不知道是哪個(gè)勞工幫他們做的工作,只知道是中介所負(fù)責(zé)完成這些就可以了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個(gè)不恰當(dāng)?shù)牡胤?,如果硬要這樣比喻的話,我只能說:其實(shí)勞工里面也有很多富人的o! 。很多勞工在完成任務(wù)的過程中自己也托給中介所一些任務(wù),然后這些任務(wù)很可能還是自己去完成。這也難怪,運(yùn)行代碼的總是這些線程,那么調(diào)用post的肯定也會(huì)有這些線程了,不過不管怎么說,如此循環(huán)往復(fù)可以解決問題就行。

多說一句,允許有多個(gè)勞工,也就允許有多個(gè)中介所。換句話說,你可以實(shí)現(xiàn)1+n模式(讓一個(gè)中介所為多個(gè)勞工服務(wù)),也可以實(shí)現(xiàn)為n*(1+1)模式(一個(gè)中介所只為一個(gè)勞工服務(wù),有多個(gè)這樣的搭配),當(dāng)然還可以實(shí)現(xiàn)為n+m模式(多個(gè)中介所共同為多個(gè)勞工服務(wù))。

對于n*(1+1)模式,其實(shí)相當(dāng)于對那一部分?jǐn)?shù)據(jù)實(shí)現(xiàn)了單線程處理模型,類似于一個(gè)spsc隊(duì)列。這里暫不討論。

對于1+n模式和n+m模式,情況會(huì)復(fù)雜一些。
boost asio io_service與 strand 分析 一文的例子拿來用,他采用了n+m模式(2+3)來驗(yàn)證strand的作用。這種模式也就是官方文檔中說的線程池模型:

Multiple threads may call the run()
function to set up a pool of threads from which the io_service
may execute handlers. All threads that are waiting in the pool are equivalent and the io_service
may choose any one of them to invoke a handler.

首先是使用了strand的版本。

#include <iostream>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace std;
using namespace boost;
using namespace asio;

typedef boost::asio::io_service ioType;
typedef boost::asio::strand strandType;
ioType m_service;
strandType m_strand(m_service);
boost::mutex m_mutex;


void print( int fatherID)
{
//  boost::mutex::scoped_lock lock(m_mutex);
  static int count = 0;
  cout<<"fatherID "<<fatherID<<" "<<endl;
  sleep(1);
  cout<<"count "<<count++<<endl;
}

void ioRun1()
{
  while(1)
    {
      m_service.run();
    }
}
//
void ioRun2()
{
  while(1)
    {
      m_service.run();
    }
}

void print1()
{
  m_strand.dispatch(bind(print,1));
//  cout<<"over"<<endl;
}

void print2()
{
  m_strand.post(bind(print,2));
}

void print3()
{
  m_strand.post(bind(print,3));
}

int main()
{
  boost::thread t0(ioRun1);
  boost::thread t(ioRun2);
  boost::thread t1(print1);
  boost::thread t2(print2);
  boost::thread t3(print3);
  cout<<"111111"<<endl;
  t1.join();
  t2.join();
  t3.join();
  t0.join();
  t.join();
  cout<<"ads"<<endl;
  return 0;
}

最終輸出結(jié)果:

   fatherID 1 
 count 0
 fatherID 2 
 count 1
 fatherID 3 
   count 2

說明這是線程安全的!

但是 而 io_service 不能保證:更改程序:

void print1()
{
  m_service.dispatch(bind(print,1));
//  cout<<"over"<<endl;
}

void print2()
{
  m_service.post(bind(print,2));
}

void print3()
{
  m_service.post(bind(print,3));
}

輸出結(jié)果:

fatherID 3 
fatherID 2 
count 0
fatherID 1 
count 1
count 2

很顯然,這里存在并發(fā)的問題。所以strand就是為了解決這樣的問題而出現(xiàn)的。

How strands guarantee correct execution of pending events in boost.asio一文的這個(gè)回答說:

The strand manages all handlers posted to it in a FIFO queue. When the queue is empty and a handler is posted to the strand, then the strand will post an internal handle to the io_service. Within the internal handler, a handler will be dequeued from the strand's FIFO queue, executed, and then if the queue is not empty, the internal handler posts itself back to the io_service.

翻譯過來,就是說strand使用一個(gè)FIFO隊(duì)列來管理handlers(也就是函數(shù)對象)。

asio::async_write and strand的問題是,
asio::async_write(m_socket, asio::buffer(buf, bytes),
custom_alloc(m_strand.wrap(custom_alloc(_OnSend))));

Does this code guarantee that all asynchronous operation handlers(calls to async_write_some) inside async_write are called through strand? (or it's just for my_handler?)

Tanner Sansbury的回答很細(xì)致:
With the following code:

asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));

For this composed operation, all calls to stream.async_write_some() will be invoked within m_strand if all of the following conditions are true:

  • The initiating async_write(...) call is running within m_strand():

     assert(m_strand.running_in_this_thread());
     asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));
    
  • The return type from custom_alloc is either:

  • the exact type returned from strand::wrap()

        template <typename Handler> 
        Handler custom_alloc(Handler) { ... }
        template <class Handler>
        class custom_handler
        {
        public:
          custom_handler(Handler handler)
            : handler_(handler)
          {}

          template <class... Args>
          void operator()(Args&&... args)
          {
            handler_(std::forward<Args>(args)...);
          }

          template <typename Function>
          friend void asio_handler_invoke(
            Function intermediate_handler,
            custom_handler* my_handler)
          {
            // Support chaining custom strategies incase the wrapped handler
            // has a custom strategy of its own.
            using boost::asio::asio_handler_invoke;
            asio_handler_invoke(intermediate_handler, &my_handler->handler_);
          }

        private:
          Handler handler_;
        };

        template <typename Handler>
        custom_handler<Handler> custom_alloc(Handler handler)
        {
          return {handler};
        }

See this answer for more details on strands, and this answer for details on asio_handler_invoke.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容