4.2 版中的新功能。
使用類似于標(biāo)準(zhǔn)庫提供給線程的同步原語來協(xié)調(diào)協(xié)程。 這些類與標(biāo)準(zhǔn)庫的 ?asyncio
包中提供的類非常相似。
請(qǐng)注意,這些原語實(shí)際上不是線程安全的,并且不能用來代替標(biāo)準(zhǔn)庫的線程模塊中的那些原語——它們旨在協(xié)調(diào)單線程應(yīng)用程序中的 Tornado 協(xié)程,而不是保護(hù)多線程應(yīng)用程序中的共享對(duì)象。
一個(gè)?condition
?允許一個(gè)或多個(gè)協(xié)程等待直到收到通知。
與標(biāo)準(zhǔn) ?threading.Condition
? 類似,但不需要獲取和釋放的底層鎖。
使用 ?Condition
?,協(xié)程可以等待其他協(xié)程的通知:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition
condition = Condition()
async def waiter():
print("I'll wait right here")
await condition.wait()
print("I'm done waiting")
async def notifier():
print("About to notify")
condition.notify()
print("Done notifying")
async def runner():
# Wait for waiter() and notifier() in parallel
await gen.multi([waiter(), notifier()])
IOLoop.current().run_sync(runner)
結(jié)果為:
I'll wait right here
About to notify
Done notifying
I'm done waiting
?wait
接受一個(gè)可選的 ?timeout
參數(shù),它可以是一個(gè)絕對(duì)時(shí)間戳:
io_loop = IOLoop.current()
# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)
?datetime.timedelta
? 表示相對(duì)于當(dāng)前時(shí)間的超時(shí):
# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))
如果在截止日期之前沒有通知,則該方法返回 False。
如果條件被通知,則返回一個(gè) ?Future
解析 ?True
?,或者在超時(shí)后解析為 ?False
?。
喚醒n個(gè)waiters
喚醒所有waiters
一個(gè)事件會(huì)阻塞協(xié)程,直到其內(nèi)部標(biāo)志設(shè)置為 ?True
。
類似于?threading.Event
?。
協(xié)程可以等待設(shè)置事件。 一旦設(shè)置,對(duì) ?yield event.wait()
的調(diào)用將不會(huì)阻塞,除非事件已被清除:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event
event = Event()
async def waiter():
print("Waiting for event")
await event.wait()
print("Not waiting this time")
await event.wait()
print("Done")
async def setter():
print("About to set the event")
event.set()
async def runner():
await gen.multi([waiter(), setter()])
IOLoop.current().run_sync(runner)
結(jié)果如下:
Waiting for event
About to set the event
Not waiting this time
Done
如果內(nèi)部標(biāo)志為?True
?,則返回?True
?
將內(nèi)部標(biāo)志設(shè)置為 ?True
?。 所有的waiters都被喚醒了。
設(shè)置標(biāo)志后調(diào)用 ?wait
不會(huì)阻塞。
將內(nèi)部標(biāo)志重置為 ?False
?。
調(diào)用 ?wait
將阻塞,直到調(diào)用 ?set
?。
阻塞直到內(nèi)部標(biāo)志為?True
?。
返回一個(gè) ?awaitable
?,它在超時(shí)后引發(fā) ?tornado.util.TimeoutError
?。
在阻塞之前可以獲取固定次數(shù)的鎖。
信號(hào)量管理一個(gè)計(jì)數(shù)器,表示釋放調(diào)用的數(shù)量減去獲取調(diào)用的數(shù)量,再加上一個(gè)初始值。 如果需要,?acquire
方法會(huì)阻塞,直到它可以返回而不使計(jì)數(shù)器為負(fù)。
信號(hào)量限制對(duì)共享資源的訪問。 一次允許兩個(gè)worker訪問:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore
sem = Semaphore(2)
async def worker(worker_id):
await sem.acquire()
try:
print("Worker %d is working" % worker_id)
await use_some_resource()
finally:
print("Worker %d is done" % worker_id)
sem.release()
async def runner():
# Join all workers.
await gen.multi([worker(i) for i in range(3)])
IOLoop.current().run_sync(runner)
結(jié)果如下:
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done
worker 0 和 1 被允許同時(shí)運(yùn)行,但worker 2 等到信號(hào)量被worker 0 釋放一次。
信號(hào)量可以用作異步上下文管理器:
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
為了與舊版本的 Python 兼容,?acquire
是一個(gè)上下文管理器,因此 worker 也可以寫成:
@gen.coroutine
def worker(worker_id):
with (yield sem.acquire()):
print("Worker %d is working" % worker_id)
yield use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
增加計(jì)數(shù)器并喚醒一個(gè)waiter。
減少計(jì)數(shù)器。 返回一個(gè)可等待的。
如果計(jì)數(shù)器為零,則阻塞并等待釋放。awaitable
在截止日期后引發(fā) ?TimeoutError
?。
防止 ?release()
? 被調(diào)用太多次的信號(hào)量。
如果 ?release
增加信號(hào)量的值超過初始值,它會(huì)引發(fā) ?ValueError
。 信號(hào)量主要用于保護(hù)容量有限的資源,因此信號(hào)量釋放次數(shù)過多是錯(cuò)誤的標(biāo)志。
增加計(jì)數(shù)器并喚醒一個(gè)waiter。
減少計(jì)數(shù)器。 返回一個(gè)可等待的。
如果計(jì)數(shù)器為零,則阻塞并等待釋放。 ?awaitable
在截止日期后引發(fā) ?TimeoutError
?。
協(xié)程的鎖。
鎖開始解鎖,并立即獲取鎖。 當(dāng)它被鎖定時(shí),產(chǎn)生?acquire
?的協(xié)程等待直到另一個(gè)協(xié)程調(diào)用?release
?。
釋放未鎖定的鎖會(huì)引發(fā) ?RuntimeError
?。
?Lock
可以用作帶有 ?async with
? 語句的異步上下文管理器:
>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
... async with lock:
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
為了與舊版本的 Python 兼容,?acquire
方法異步返回一個(gè)常規(guī)上下文管理器:
>>> async def f2():
... with (yield lock.acquire()):
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
嘗試鎖定。 返回一個(gè)?awaitable
?。
返回一個(gè) ?awaitable
?,它在超時(shí)后引發(fā) ?tornado.util.TimeoutError
?。
解鎖。
排隊(duì)等待獲取的第一個(gè)協(xié)程獲得鎖。
如果未鎖定,則引發(fā) ?RuntimeError
?。
更多建議: