Tornado 同步原語

2022-03-10 14:44 更新

4.2 版中的新功能。

使用類似于標(biāo)準(zhǔn)庫提供給線程的同步原語來協(xié)調(diào)協(xié)程。 這些類與標(biāo)準(zhǔn)庫的 ?asyncio包中提供的類非常相似。

請(qǐng)注意,這些原語實(shí)際上不是線程安全的,并且不能用來代替標(biāo)準(zhǔn)庫的線程模塊中的那些原語——它們旨在協(xié)調(diào)單線程應(yīng)用程序中的 Tornado 協(xié)程,而不是保護(hù)多線程應(yīng)用程序中的共享對(duì)象。

Condition

class tornado.locks.Condition

一個(gè)?condition?允許一個(gè)或多個(gè)協(xié)程等待直到收到通知。

與標(biāo)準(zhǔn) ?threading.Condition? 類似,但不需要獲取和釋放的底層鎖。

使用 ?Condition?,協(xié)程可以等待其他協(xié)程的通知:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition

condition = Condition()

async def waiter():
    print("I'll wait right here")
    await condition.wait()
    print("I'm done waiting")

async def notifier():
    print("About to notify")
    condition.notify()
    print("Done notifying")

async def runner():
    # Wait for waiter() and notifier() in parallel
    await gen.multi([waiter(), notifier()])

IOLoop.current().run_sync(runner)

結(jié)果為:

I'll wait right here
About to notify
Done notifying
I'm done waiting

?wait接受一個(gè)可選的 ?timeout參數(shù),它可以是一個(gè)絕對(duì)時(shí)間戳:

io_loop = IOLoop.current()

# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)

?datetime.timedelta? 表示相對(duì)于當(dāng)前時(shí)間的超時(shí):

# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))

如果在截止日期之前沒有通知,則該方法返回 False。

wait(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[bool]

如果條件被通知,則返回一個(gè) ?Future解析 ?True?,或者在超時(shí)后解析為 ?False?。

notify(n: int = 1) → None

喚醒n個(gè)waiters

notify_all() → None

喚醒所有waiters

事件

class tornado.locks.Event

一個(gè)事件會(huì)阻塞協(xié)程,直到其內(nèi)部標(biāo)志設(shè)置為 ?True

類似于?threading.Event?。

協(xié)程可以等待設(shè)置事件。 一旦設(shè)置,對(duì) ?yield event.wait() 的調(diào)用將不會(huì)阻塞,除非事件已被清除:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event

event = Event()

async def waiter():
    print("Waiting for event")
    await event.wait()
    print("Not waiting this time")
    await event.wait()
    print("Done")

async def setter():
    print("About to set the event")
    event.set()

async def runner():
    await gen.multi([waiter(), setter()])

IOLoop.current().run_sync(runner)

結(jié)果如下:

Waiting for event
About to set the event
Not waiting this time
Done

is_set() → bool

如果內(nèi)部標(biāo)志為?True?,則返回?True?

set() → None

將內(nèi)部標(biāo)志設(shè)置為 ?True?。 所有的waiters都被喚醒了。

設(shè)置標(biāo)志后調(diào)用 ?wait不會(huì)阻塞。

clear() → None

將內(nèi)部標(biāo)志重置為 ?False?。

調(diào)用 ?wait將阻塞,直到調(diào)用 ?set ?。

wait(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[None]

阻塞直到內(nèi)部標(biāo)志為?True?。

返回一個(gè) ?awaitable?,它在超時(shí)后引發(fā) ?tornado.util.TimeoutError?。

信號(hào)

class tornado.locks.Semaphore(value: int = 1)

在阻塞之前可以獲取固定次數(shù)的鎖。

信號(hào)量管理一個(gè)計(jì)數(shù)器,表示釋放調(diào)用的數(shù)量減去獲取調(diào)用的數(shù)量,再加上一個(gè)初始值。 如果需要,?acquire方法會(huì)阻塞,直到它可以返回而不使計(jì)數(shù)器為負(fù)。

信號(hào)量限制對(duì)共享資源的訪問。 一次允許兩個(gè)worker訪問:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore

sem = Semaphore(2)

async def worker(worker_id):
    await sem.acquire()
    try:
        print("Worker %d is working" % worker_id)
        await use_some_resource()
    finally:
        print("Worker %d is done" % worker_id)
        sem.release()

async def runner():
    # Join all workers.
    await gen.multi([worker(i) for i in range(3)])

IOLoop.current().run_sync(runner)

結(jié)果如下:

Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done

worker 0 和 1 被允許同時(shí)運(yùn)行,但worker 2 等到信號(hào)量被worker 0 釋放一次。

信號(hào)量可以用作異步上下文管理器:

async def worker(worker_id):
    async with sem:
        print("Worker %d is working" % worker_id)
        await use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

為了與舊版本的 Python 兼容,?acquire是一個(gè)上下文管理器,因此 worker 也可以寫成:

@gen.coroutine
def worker(worker_id):
    with (yield sem.acquire()):
        print("Worker %d is working" % worker_id)
        yield use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

release() → None

增加計(jì)數(shù)器并喚醒一個(gè)waiter。

acquire(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[tornado.locks._ReleasingContextManager]

減少計(jì)數(shù)器。 返回一個(gè)可等待的。

如果計(jì)數(shù)器為零,則阻塞并等待釋放。awaitable在截止日期后引發(fā) ?TimeoutError?。

有界信號(hào)量

class tornado.locks.BoundedSemaphore(value: int = 1)

防止 ?release()? 被調(diào)用太多次的信號(hào)量。

如果 ?release增加信號(hào)量的值超過初始值,它會(huì)引發(fā) ?ValueError。 信號(hào)量主要用于保護(hù)容量有限的資源,因此信號(hào)量釋放次數(shù)過多是錯(cuò)誤的標(biāo)志。

release() → None

增加計(jì)數(shù)器并喚醒一個(gè)waiter。

acquire(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[tornado.locks._ReleasingContextManager]

減少計(jì)數(shù)器。 返回一個(gè)可等待的。

如果計(jì)數(shù)器為零,則阻塞并等待釋放。 ?awaitable在截止日期后引發(fā) ?TimeoutError?。

class tornado.locks.Lock

協(xié)程的鎖。

鎖開始解鎖,并立即獲取鎖。 當(dāng)它被鎖定時(shí),產(chǎn)生?acquire?的協(xié)程等待直到另一個(gè)協(xié)程調(diào)用?release?。

釋放未鎖定的鎖會(huì)引發(fā) ?RuntimeError?。

?Lock可以用作帶有 ?async with? 語句的異步上下文管理器:

>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
...    async with lock:
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

為了與舊版本的 Python 兼容,?acquire方法異步返回一個(gè)常規(guī)上下文管理器:

>>> async def f2():
...    with (yield lock.acquire()):
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

acquire(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[tornado.locks._ReleasingContextManager]

嘗試鎖定。 返回一個(gè)?awaitable?。

返回一個(gè) ?awaitable?,它在超時(shí)后引發(fā) ?tornado.util.TimeoutError?。

release() → None

解鎖。

排隊(duì)等待獲取的第一個(gè)協(xié)程獲得鎖。

如果未鎖定,則引發(fā) ?RuntimeError?。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)