協(xié)程是在 Tornado 中編寫(xiě)異步代碼的推薦方式。協(xié)程使用 Python的?await
?或?yield
?關(guān)鍵字來(lái)暫停和恢復(fù)執(zhí)行,而不是一連串的回調(diào)(在gevent等框架中看到的協(xié)作輕量級(jí)線程有時(shí)也稱(chēng)為協(xié)程,但在 Tornado 中,所有協(xié)程都使用顯式上下文切換并被稱(chēng)為異步函數(shù))
協(xié)程幾乎和同步代碼一樣簡(jiǎn)單,但沒(méi)有線程的開(kāi)銷(xiāo)。它們還通過(guò)減少可能發(fā)生上下文切換的位置數(shù)量,使并發(fā)更容易推理。
例子:
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
Python 3.5 引入了?async
?和?await
?關(guān)鍵字(使用這些關(guān)鍵字的函數(shù)也稱(chēng)為“本機(jī)協(xié)程”)。為了與舊版本的 Python 兼容,您可以使用裝飾器使用“裝飾”或“基于產(chǎn)量”的協(xié)程tornado.gen.coroutine
盡可能推薦使用原生協(xié)程。僅在需要與舊版本的 Python 兼容時(shí)才使用修飾的協(xié)程。Tornado 文檔中的示例通常會(huì)使用原生形式。
兩種形式之間的轉(zhuǎn)換通常很簡(jiǎn)單:
# Decorated: # Native:
# Normal function declaration
# with decorator # "async def" keywords
@gen.coroutine
def a(): async def a():
# "yield" all async funcs # "await" all async funcs
b = yield c() b = await c()
# "return" and "yield"
# cannot be mixed in
# Python 2, so raise a
# special exception. # Return normally
raise gen.Return(b) return b
下面概述了兩種形式的協(xié)程之間的其他差異:
原生協(xié)程:
async for
?和?async with
?語(yǔ)句使某些模式更簡(jiǎn)單。 yield
?和?await
?他們,否則根本不要運(yùn)行。裝飾協(xié)程一旦被調(diào)用就可以開(kāi)始“在后臺(tái)”運(yùn)行。請(qǐng)注意,對(duì)于這兩種協(xié)程,使用?await
?或?yield
?是很重要的,這樣任何異常都有可能發(fā)生。裝飾協(xié)程:
executor.submit
?直接產(chǎn)生結(jié)果。對(duì)于本機(jī)協(xié)程,請(qǐng)?IOLoop.run_in_executor?改用本節(jié)解釋裝飾協(xié)程的操作。原生協(xié)程在概念上相似,但由于與 Python 運(yùn)行時(shí)的額外集成而稍微復(fù)雜一些
包含的函數(shù)?yield
?是生成器。所有生成器都是異步的;當(dāng)被調(diào)用時(shí),它們返回一個(gè)生成器對(duì)象,而不是運(yùn)行到完成。裝飾器?@gen.coroutine
?通過(guò)?yield
?表達(dá)式與生成器通信,并通過(guò)返回一個(gè)Future
這是協(xié)程裝飾器內(nèi)部循環(huán)的簡(jiǎn)化版本:
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
裝飾器Future從生成器接收 a,等待(不阻塞)Future完成,然后“解包”并將結(jié)果作為表達(dá)式Future 的結(jié)果發(fā)送回生成器 。?yield
?大多數(shù)異步代碼從不直接接觸類(lèi),除非立即將Future異步函數(shù)返回的值傳遞給?yield
?表達(dá)式。
協(xié)程不會(huì)以正常方式引發(fā)異常:它們引發(fā)的任何異常都將被困在等待對(duì)象中,直到它被產(chǎn)生。這意味著以正確的方式調(diào)用協(xié)程很重要,否則您可能會(huì)遇到未被注意到的錯(cuò)誤:
async def divide(x, y):
return x / y
def bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)
在幾乎所有情況下,任何調(diào)用協(xié)程的函數(shù)都必須是協(xié)程本身,并且在調(diào)用中使用?await
?或者?yield
?關(guān)鍵字。當(dāng)您覆蓋類(lèi)中定義的方法時(shí),請(qǐng)查閱文檔以查看是否允許使用協(xié)程(文檔應(yīng)說(shuō)明該方法“可能是協(xié)程”或“可能返回一個(gè)Future”):
async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)
有時(shí)你可能想“觸發(fā)并忘記”一個(gè)協(xié)程而不等待它的結(jié)果。在這種情況下,建議使用IOLoop.spawn_callback,這使得IOLoop負(fù)責(zé)呼叫。如果失敗,IOLoop將記錄堆棧跟蹤:
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
對(duì)于使用IOLoop.spawn_callback的函數(shù),建議以這種方式使用?@gen.coroutine
?,但對(duì)于使用?async def
?的函數(shù),則需要以這種方式使用(否則,協(xié)程運(yùn)行程序?qū)o(wú)法啟動(dòng))。
最后,在程序的頂層,如果 IOLoop 尚未運(yùn)行,您可以啟動(dòng)IOLoop,運(yùn)行協(xié)程,然后IOLoop使用IOLoop.run_sync方法停止。這通常用于啟動(dòng)?main
?面向批處理的程序的功能:
# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))
從協(xié)程調(diào)用阻塞函數(shù)的最簡(jiǎn)單方法是使用IOLoop.run_in_executor,它的返回值 ?Futures
?與協(xié)程兼容:
async def call_blocking():
await IOLoop.current().run_in_executor(None, blocking_func, args)
該multi函數(shù)接受值為列表和字典,并等待所有這些?Futures
?:
from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}
在裝飾協(xié)程中,可以?yield
?直接生成使用list 或dict:
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
有時(shí),保存一個(gè)Future而不是立即放棄它是有用的,這樣你就可以在等待之前開(kāi)始另一個(gè)操作。
from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
yield self.flush()
這對(duì)裝飾協(xié)程來(lái)說(shuō)更容易一些,因?yàn)樗鼈冊(cè)谡{(diào)用時(shí)立即啟動(dòng):
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
在本地協(xié)同程序中,可以使用?async for
?。在較舊版本的Python中,使用協(xié)同路由進(jìn)行循環(huán)是很棘手的,因?yàn)樵?for
?或?while
?循環(huán)的每次迭代中都無(wú)法找到?yield
?并捕獲結(jié)果。相反,您需要將循環(huán)條件與訪問(wèn)結(jié)果分開(kāi),如本例中的Motor:
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
PeriodicCallback通常不與coroutines一起使用。相反,協(xié)同程序可以包含?while True
?:循環(huán)并使用tornado.gen.sleep:
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
有時(shí)可能需要一個(gè)更復(fù)雜的循環(huán)。例如,前一個(gè)循環(huán)每?60+N
?秒運(yùn)行一次,其中?N
?是?do_something()
?的運(yùn)行時(shí)間。要準(zhǔn)確地每60秒運(yùn)行一次,請(qǐng)使用上面的交錯(cuò)模式:
async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.
更多建議: