我對boost.asio的使用不深入,所以總是過了一段時(shí)間就會(huì)對它的理解上又變得模糊起來。
還是先回頭看看boost.asio的一些基礎(chǔ)知識(shí)。
ASIO攻破?。。?/a>這篇文章有一個(gè)蠻不錯(cuò)的比喻:
其實(shí)io_service就像是勞工中介所,而一個(gè)線程就是一個(gè)勞工,調(diào)用post的模塊相當(dāng)于富人們,他們?nèi)ブ薪樗腥蝿?wù),而勞工們就聽候中介所的調(diào)遣去執(zhí)行這些任務(wù),任務(wù)的內(nèi)容就寫在富人們給你的handler上,也就是函數(shù)指針,指針指向具體實(shí)現(xiàn)就是任務(wù)的實(shí)質(zhì)內(nèi)容。其實(shí)在整個(gè)過程中,富人們都不知道是哪個(gè)勞工幫他們做的工作,只知道是中介所負(fù)責(zé)完成這些就可以了。這使得邏輯上的耦合降到了最低。不過這樣的比喻也有個(gè)不恰當(dāng)?shù)牡胤?,如果硬要這樣比喻的話,我只能說:其實(shí)勞工里面也有很多富人的o! 。很多勞工在完成任務(wù)的過程中自己也托給中介所一些任務(wù),然后這些任務(wù)很可能還是自己去完成。這也難怪,運(yùn)行代碼的總是這些線程,那么調(diào)用post的肯定也會(huì)有這些線程了,不過不管怎么說,如此循環(huán)往復(fù)可以解決問題就行。
多說一句,允許有多個(gè)勞工,也就允許有多個(gè)中介所。換句話說,你可以實(shí)現(xiàn)1+n模式(讓一個(gè)中介所為多個(gè)勞工服務(wù)),也可以實(shí)現(xiàn)為n*(1+1)模式(一個(gè)中介所只為一個(gè)勞工服務(wù),有多個(gè)這樣的搭配),當(dāng)然還可以實(shí)現(xiàn)為n+m模式(多個(gè)中介所共同為多個(gè)勞工服務(wù))。
對于n*(1+1)模式,其實(shí)相當(dāng)于對那一部分?jǐn)?shù)據(jù)實(shí)現(xiàn)了單線程處理模型,類似于一個(gè)spsc隊(duì)列。這里暫不討論。
對于1+n模式和n+m模式,情況會(huì)復(fù)雜一些。
把boost asio io_service與 strand 分析 一文的例子拿來用,他采用了n+m模式(2+3)來驗(yàn)證strand的作用。這種模式也就是官方文檔中說的線程池模型:
Multiple threads may call the run()
function to set up a pool of threads from which the io_service
may execute handlers. All threads that are waiting in the pool are equivalent and the io_service
may choose any one of them to invoke a handler.
首先是使用了strand的版本。
#include <iostream>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace std;
using namespace boost;
using namespace asio;
typedef boost::asio::io_service ioType;
typedef boost::asio::strand strandType;
ioType m_service;
strandType m_strand(m_service);
boost::mutex m_mutex;
void print( int fatherID)
{
// boost::mutex::scoped_lock lock(m_mutex);
static int count = 0;
cout<<"fatherID "<<fatherID<<" "<<endl;
sleep(1);
cout<<"count "<<count++<<endl;
}
void ioRun1()
{
while(1)
{
m_service.run();
}
}
//
void ioRun2()
{
while(1)
{
m_service.run();
}
}
void print1()
{
m_strand.dispatch(bind(print,1));
// cout<<"over"<<endl;
}
void print2()
{
m_strand.post(bind(print,2));
}
void print3()
{
m_strand.post(bind(print,3));
}
int main()
{
boost::thread t0(ioRun1);
boost::thread t(ioRun2);
boost::thread t1(print1);
boost::thread t2(print2);
boost::thread t3(print3);
cout<<"111111"<<endl;
t1.join();
t2.join();
t3.join();
t0.join();
t.join();
cout<<"ads"<<endl;
return 0;
}
最終輸出結(jié)果:
fatherID 1
count 0
fatherID 2
count 1
fatherID 3
count 2
說明這是線程安全的!
但是 而 io_service 不能保證:更改程序:
void print1()
{
m_service.dispatch(bind(print,1));
// cout<<"over"<<endl;
}
void print2()
{
m_service.post(bind(print,2));
}
void print3()
{
m_service.post(bind(print,3));
}
輸出結(jié)果:
fatherID 3
fatherID 2
count 0
fatherID 1
count 1
count 2
很顯然,這里存在并發(fā)的問題。所以strand就是為了解決這樣的問題而出現(xiàn)的。
How strands guarantee correct execution of pending events in boost.asio一文的這個(gè)回答說:
The strand manages all handlers posted to it in a
FIFOqueue. When the queue is empty and a handler is posted to the strand, then the strand will post an internal handle to the io_service. Within the internal handler, a handler will be dequeued from the strand's FIFO queue, executed, and then if the queue is not empty, the internal handler posts itself back to the io_service.
翻譯過來,就是說strand使用一個(gè)FIFO隊(duì)列來管理handlers(也就是函數(shù)對象)。
asio::async_write and strand的問題是,
asio::async_write(m_socket, asio::buffer(buf, bytes),
custom_alloc(m_strand.wrap(custom_alloc(_OnSend))));
Does this code guarantee that all asynchronous operation handlers(calls to async_write_some) inside async_write are called through strand? (or it's just for my_handler?)
Tanner Sansbury的回答很細(xì)致:
With the following code:
asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...)));
For this composed operation, all calls to stream.async_write_some() will be invoked within m_strand if all of the following conditions are true:
-
The initiating
async_write(...)call is running withinm_strand():assert(m_strand.running_in_this_thread()); asio::async_write(stream, ..., custom_alloc(m_strand.wrap(...))); The return type from
custom_allocis either:the exact type returned from
strand::wrap()
template <typename Handler>
Handler custom_alloc(Handler) { ... }
- a custom handler that appropriate chains invocations of
asio_handler_invoke():
template <class Handler>
class custom_handler
{
public:
custom_handler(Handler handler)
: handler_(handler)
{}
template <class... Args>
void operator()(Args&&... args)
{
handler_(std::forward<Args>(args)...);
}
template <typename Function>
friend void asio_handler_invoke(
Function intermediate_handler,
custom_handler* my_handler)
{
// Support chaining custom strategies incase the wrapped handler
// has a custom strategy of its own.
using boost::asio::asio_handler_invoke;
asio_handler_invoke(intermediate_handler, &my_handler->handler_);
}
private:
Handler handler_;
};
template <typename Handler>
custom_handler<Handler> custom_alloc(Handler handler)
{
return {handler};
}
See this answer for more details on strands, and this answer for details on asio_handler_invoke.