ThreadPoolExecutor和asyncio完成阻塞IO請求
這個小節(jié)我們看下如何將線程池和asyncio結合起來。
在協(xié)程里面我們還是需要使用多線程的,那什么時候需要使用多線程呢?
我們知道協(xié)程里面是不能加入阻塞IO的,但是有時我們必須執(zhí)行阻塞IO的操作的時候,我們就需要多線程編程了,即我們要在協(xié)程中集成阻塞IO的時候就需要多線程操作。
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
def get_url(url):
#通過socket請求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
#建立socket連接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, 80)) #阻塞不會消耗cpu
#不停的詢問連接是否建立好, 需要while循環(huán)不停的去檢查狀態(tài)
#做計算任務或者再次發(fā)起其他的連接請求
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
# 獲得線程池的 executor
executor = ThreadPoolExecutor()
# 同樣我們可以控制線程池的并發(fā)數(shù)量
# executor = ThreadPoolExecutor()
# 并發(fā)20個請
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
# 將阻塞的代碼放到線程池中運行 返回的是 task
task = loop.run_in_executor(executor, get_url, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time()-start_time))
# 輸出
last time:2.110485076904297
上面的代碼會生成一個線程池 然后讓阻塞的代碼去線程池中執(zhí)行。
看下源碼:
def run_in_executor(self, executor, func, *args):
self._check_closed()
if self._debug:
self._check_callback(func, 'run_in_executor')
if executor is None:
executor = self._default_executor
# 即使我們沒創(chuàng)建 executor 也會自己創(chuàng)建一個
if executor is None:
executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
# 最后將阻塞代碼放到線程池執(zhí)行 然后返回一個 future 對象
return futures.wrap_future(executor.submit(func, *args), loop=self)
def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
if isfuture(future):
return future
assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future)
if loop is None:
loop = events.get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
當我們需要在協(xié)程中調用阻塞IO的時候 就可以按照這種方式 放到線程池中
asyncio模擬http請求
在asyncio里面凡是異步的地方都會創(chuàng)建一個future
import asyncio
from urllib.parse import urlparse
async def get_url(url):
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
# 通過協(xié)程的方式 建立socket連接 返回兩個對象
reader, writer = await asyncio.open_connection(host, 80)
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
all_lines = []
async for raw_line in reader:
data = raw_line.decode("utf8")
all_lines.append(data)
html = "\n".join(all_lines)
return html
async def main():
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
# 添加 future 對象到列表中
tasks.append(asyncio.ensure_future(get_url(url)))
# 將完成的打印出來 as_completed 返回的是協(xié)程
for task in asyncio.as_completed(tasks):
result = await task
print(result)
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('last time:{}'.format(time.time() - start_time))
if __name__ == "__main__":
import time
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
tasks.append(get_url(url))
loop.run_until_complete(asyncio.wait(tasks))
print('last time:{}'.format(time.time() - start_time))
整個過程和之前我們實現(xiàn)的完全一致
future和task
future是一個結果容器會將結果放到future中,結果容器運行完畢之后會運行callback,類似線程池中的future。task是future的一個子類。
我們看下一個特殊的函數(shù)
class Future:
"""This class is *almost* compatible with concurrent.futures.Future.
Differences:
- result() and exception() do not take a timeout argument and
raise an exception when the future isn't done yet.
- Callbacks registered with add_done_callback() are always called
via the event loop's call_soon_threadsafe().
- This class is not compatible with the wait() and as_completed()
methods in the concurrent.futures package.
(In Python 3.4 or later we may be able to unify the implementations.)
"""
def set_result(self, result):
"""Mark the future done and set its result.
If the future is already done when this method is called, raises
InvalidStateError.
"""
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
# 運行完賦值之后 執(zhí)行回調
self._schedule_callbacks()
def _schedule_callbacks(self):
"""Internal: Ask the event loop to call all callbacks.
The callbacks are scheduled to be called as soon as possible. Also
clears the callback list.
"""
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
# 因為是單線程模式 調用 call_soon 放到 loop 隊列中
# 然后由loop隊列取數(shù)據(jù)執(zhí)行
# 其他部分和線程池類似
for callback in callbacks:
self._loop.call_soon(callback, self)
為什么需要一個Task對象呢?
實際上task是協(xié)程和future之間的一個重要橋梁。
我們看下具體代碼
我們知道在定義一個協(xié)程之后,在驅動協(xié)程之前,必須對這個協(xié)程調用一次next或send方法,讓這個協(xié)程生效

我們從源碼看出task對象在初始化的時候調用了_step函數(shù),而這個函數(shù)做了兩個必要的事情。
第一個就是啟動協(xié)程:
協(xié)程是和線程不一樣的,協(xié)程必須要經歷一個啟動的過程。線程則不必,因此線程是由操作系統(tǒng)來調用的。但是協(xié)程是程序員自己調度的,我們必須要解決協(xié)程啟動的問題。所以為了解決這個問題,抽象除了一個task對象,在初始化的時候就會啟動協(xié)程。
第二個就是將協(xié)程的返回值設置到result中:
當運行時拋出StopIteration的時候,就會運行set_result將協(xié)程的return值保存到result中。線程中是沒有StopIteration異常的。
為了保持協(xié)程和線程接口一致問題,創(chuàng)造了task對象來解決協(xié)程和線程不一樣的地方所需要解決的問題。
我們看下上篇的圖片,其中將上面的代碼圖形化了。
