asyncio なんもわからんから オライリー買って抜粋和訳してみた part 2 ~ asyncio の塔からコルーチン ~

tl;dr

asyncio なんもわからんからこれを買って、私にとって重要そうな場所のみを抜粋和訳してみたよ!

part1

asyncio の塔

前のセクション(part1)で見たように、エンドユーザ開発者(=フレームワーク開発者でない)として asyncio を使うために知っておく必要のあるコマンドはわずかです。残念なことに asyncio のドキュメントは膨大な数の API を提示していますが、フラットなフォーマットで書かれているためで、一般的な使用を目的としたもので、エンドユーザ向けかがわかりづらいです。フレームワーク設計者が同じドキュメントを見る場合は新しいフレームワークサードパーティのライブラリと接続するためのフックポイントを探すために膨大なAPIがあります。このセクションでは、フレームワーク設計者の目を通して asyncio を見て、新しい非同期互換ライブラリの構築にどのようにアプローチするかを理解します。表3-1 は階層構造になっているので理解の助けになるかと思います。

Level Concept Implementation
Tier 9 Network: Streams StreamReader, StreamWriter, asyncio.open_connection(), asyncio.start_server()
Tier 8 Network: TCP&UDP Protocol
Tier 7 transports BaseTransport
Tier 6 Tools asyncio.Queue
Tier 5 Subprocesses & threads run_in_executor(), asyncio.subprocess
Tier 4 Tasks asyncio.Task, asyncio.create_task()
Tier 3 Futures asyncio.Future
Tier 2 Event Loop asyncio.run(), BaseEventLoop
Tier 1 Coroutines async def, async with, async for, await

Tier1 には、コルーチンがあります。非同期フレームワークが他に2つあり、 Curio と Trio が上げられます。どちらも Python のネイティブコルーチンにのみ依存しており、 asyncio ライブラリは利用しておりません。

Tier2 はイベントループです。コルーチンはそれ自体ではなんの役にもたたず、実行のためのループがなければ何もしません。(Curio と Trio は独自のイベントループを実装します)使用と実装が明確に分離されているので、サードパーティの開発者がイベントループのだいたい実装を行うことも可能です。Tier3 と 4 は future と task があり、これらは密接に関連しています。task は future のサブクラスであるために分類されていますがある意味で同じ階層にあるとも言えます。task はイベントループ上で実行されるコルーチンを表します。簡単に言うと、 future はループ対応ですが、 task はループ対応とコルーチン対応の両面を持っています。

Tier 5 は別のスレッド、あるいは別のプロセスで実行されなければならない作業を起動したり待機したりするための機能を表しています。

Tier 6 はasyncio.Queue のような追加の async 対応ツールです。asyncio が提供するキューは、キューモジュールのスレッドセーフキューと非常によく似た API を持っており、 asyncio バージョンでは get() と put() でawait キーワードを必要とすることを除けば、かなり似ています。queue.Queue を直接コルーチンで使うことはできません。最後に Tier7 ~ 9 ですが、エンドユーザが最も利用しやすいのが Tier9 のストリーム API です。Tier8 は 9 をより詳細にしたもので、Tier7 はトランスポート層で、他の人が使用するフレームワークを作成して、トランスポートの設定方法をカスタマイズする必要がない限り、触る必要はほとんどありません。Quickstart ではasyncio ライブラリを使い始めるために必要な最低限の知識を見てきましたが、ここから asyncio ライブラリの API 全体がどのようにまとめられているかを見ます。

コルーチン

async def キーワード

3-4

async def f():
    return 123
print(type(f))
import inspect
print(inspect.iscoroutinefunction(f))

実行結果

<class 'function'>
True

これが最もシンプルなコルーチンの宣言ですが、fの正確な型は "coroutine "ではありません。async def関数をコルーチンと呼ぶのが一般的ですが、厳密にはPythonではコルーチン関数と考えられています。この動作は、Pythonでのジェネレータ関数の動作と同じです。

generator の例

def g():
    yield 123
print(type(g))
gen = g()
print(type(gen))

実行結果

<class 'function'>
<class 'generator'>

g が「ジェネレータ」と誤って呼ばれることがありますが、g は関数であることに変わりはなく、この関数が評価されて初めてジェネレータが返されます。コルーチン関数は全く同じように動作します。コルーチンオブジェクトを取得するには、async def関数を呼び出す必要があります。 標準ライブラリの inspect モジュールは、組み込みの type() 関数よりもはるかに優れたイントロスペクティブ機能を提供することができます。 通常の関数とコルーチン関数を区別するための iscoroutinefunction() 関数があります。

先程の f 関数のつづき

async def f():
    return 123
coro = f()
print(type(coro))
print(inspect.iscoroutine(coro))

実行結果

<class 'coroutine'>
True

ここで元の質問に戻ります。コルーチンとは、完了前に中断されていた基礎となる関数を再開する機能をカプセル化したオブジェクトです。聞き覚えがあるかもしれませんが、コルーチンはジェネレータに非常に似ているためです。実際、Python 3.5 で async def と await キーワードを持つネイティブのコルーチンが導入される前は、Python 3.4 で特別なデコレータを持つ通常のジェネレータを使用することで、すでに asyncio ライブラリを使用することが可能でした。Python がどのようにそれらを利用しているかを見るために、もう少しコルーチンオブジェクトを使って遊ぶことができます。最も重要なことは、Pythonがどのようにしてコルーチン間で実行を "切り替える "ことができるのかを見たいということです。まず、戻り値がどのようにして得られるのかを見てみましょう。コルーチンが戻ると、実際に何が起こるかというと、StopIteration例外が発生します。先ほどの例と同じセッションを続けている3-6は、そのことを明確にしています。

3-6

async def f():
    return 123
coro = f()
try:
    coro.send(None)
except StopIteration as e:
    print('The answer was:', e.value)

実行結果

The answer was: 123

コルーチンは、Noneを「送信」することで開始されます。内部的には、これがイベントループがコルーチンに対して行うことになりますので、これを手動で行う必要はありません。あなたが作成したすべてのコルーチンは、loop.create_task(coro)かawait coroのどちらかで実行されます。.send(None)を裏で実行するのはループです。 coroutineが戻ってくると、StopIterationと呼ばれる特殊な種類の例外が発生します。例外自体のvalue属性を使って、コアーチンの戻り値にアクセスできることに注意してください。繰り返しになりますが、このように動作することを知る必要はありません。エンドユーザから見ると、async def関数は、通常の関数と同じように、単にリターン文で値を返すだけです。 send()とStopIterationの2つのポイントは、それぞれ実行中のコルーチンの開始と終了を定義します。これまでのところ、これは関数を実行するための本当に複雑な方法のように見えましたが、それでも構いません。イベントループは、これらの低レベルの内部構造を持つコルーチンを駆動する役割を果たします。エンドユーザは、ループ上で実行するためにコルーチンをスケジュールするだけで、通常の関数のようにトップダウンで実行されます。次のステップでは、どのようにしてコルーチンの実行を中断できるかを見てみましょう。

await キーワード

この新しいキーワード await は常にパラメータを取り、下記のどちらか(AND や OR ではなく XOR)で定義される awaitable と呼ばれるものだけを受けつけます。

  • コルーチン( つまりasync def で定義された関数を呼び出した結果)
  • await() を実装した特殊なオブジェクト。 await() メソッドはイテレータを返さなければなりません。

2 つ目の await() を実装したオブジェクト)は、この本の対象外ですが(日々の asyncio プログラミングでは決して必要ないでしょう)、最初のユースケースは、例 3-7 で示されているように、非常に簡単です。

3-7 using await on a coroutine ※一部私が改変

import asyncio
async def f():
    await asyncio.sleep(1.0)
    return 123
async def main():
    result = await f() # await を指定した場合は 123 の int オブジェクトが格納される
    # result = f() # 仮にこっちだった場合は result はコルーチンオブジェクト
    print(result)
    print(type(result))
    return result
asyncio.run(main())

実行結果

123
<class 'int'>

f() をした場合はコルーチンが生成されます。それは実行の完了を待つことが許可されていることを意味しています。 result には f() が完了した際には 123 の値の int オブジェクトが格納されます。

このセクションを終わってイベントループに移る前に、コルーチンにどのようにして例外が与えられるかを見ておくと理解が進みます。これは一般的にタスクのキャンセルに用いられ、task.cancel() した際、イベントループは内部的に coro.throw() を使用して、コルーチンの内部で asyncio.CancelelledError を発生させます(例 3-8)

3-8 Using coro.throw() to inject exceptions into a coroutine ※一部私が改変

import asyncio
async def f():
    print('start sleep')
    await asyncio.sleep(5.0)
    print('end sleep')
    return 123

coro = f() # コルーチン関数 f() から新しいコルーチンが作成
coro.send(None)
coro.throw(Exception, 'Blah') # 別の send() を行う代わりに throw() を呼び出し、例外クラスと値を指定します。これにより、コルーチン内の待機ポイントで例外が発生します。 throw() メソッドは、タスクのキャンセルに (asyncio の内部で) 使用されます。

実行結果

start sleep
Traceback (most recent call last):
  File "***/main.py", line 10, in <module>
    coro.throw(Exception, 'Blah')
  File "***/main.py", line 4, in f
    await asyncio.sleep(5.0)
  File "***\asyncio\tasks.py", line 595, in sleep
    return await future
Exception: Blah

5 秒の sleep の間に例外を発生させ、処理をストップさせることに成功しています。

例3-9では、新しいコルーチンの中でキャンセルを処理することにします。

例 3-9. Coroutine cancellation with CancelledError ※一部私で改変

import asyncio
async def f():
    try:
        while True:
            print('f exec')
            await asyncio.sleep(0) # pause 的な役割をし、 イベントループが戻ってくると次に進む模様
            print('f executed')
    except asyncio.CancelledError: # 注 1
        print('I was cancelled!') # 注 2
    else:
        return 111
coro = f()
coro.send(None)
coro.send(None)
coro.throw(asyncio.CancelledError) # 注 3

実行結果

Traceback (most recent call last):
  File "***/main.py", line 15, in <module>
    coro.throw(asyncio.CancelledError)
StopIteration # 注 4
f exec
f executed
f exec
I was cancelled!# 注 5
  1. コルーチン関数は例外を扱えるようになりました。実際にはタスクキャンセルのために asyncio ライブラリ全体で使用されている特定の例外タイプである asyncio.CancelledError を処理します。この例外は、外部からコルーチンに注入されていることに注意してください。実際のコードでは、後述されていますが、タスクがキャンセルされると、 CanceledError はタスクがラップされたコルーチンの内部で発生します。
  2. タスクがキャンセルされたことを伝える出力です。例外を処理することで、例外が伝搬しなくなり、コルーチンが戻ってくることを保証していることに注意してください。
  3. CancelledError 例外を throw() しています。
  4. これで、コルーチンは正常に終了します。(すでに説明したとおり、StopIteration 例外は、コルーチンが終了する正常な方法です)
  5. 予想通りキャンセルされています

タスクのキャンセルがどのように通常の例外の発生(および処理)以外の何物でもないかという点について、例 3-10 を見てみましょう。

3-10 For educational purpose only - don't do this !

import asyncio
async def f():
    try:
        while True: await asyncio.sleep(0)
    except asyncio.CancelledError:
        print('Nope!')
        while True: await asyncio.sleep(0) # 注1
    else:
        return 111
coro = f()
coro.send(None)
coro.throw(asyncio.CancelledError) # 注 2
coro.send(None) # 注 3

実行結果

Nope!
  1. メッセージを出力する代わりにキャンセルした後、待機状態に戻ると何が起こるのでしょうか?
  2. 当然のことながら外側のコルーチンは行き続け、すぐに新しいコルーチンでサスペンドします。
  3. 全てが正常に進行し、コルーチンは予想道理にサスペンドし、再開し続けます。

もちろんこれを実際にはやってはいけないことだというのは言うまでも有りません。もしあなたのコルーチンがキャンセル信号を受信した場合は、それは必要なクリーンアップだけを行って終了するように、との明確な指示であり、無視してはいけません。 全ての .send(None) を手動で行ってイベントループを行うのは辛いので、 3-11 では asyncio で提供されているループを利用し、それに応じて3-10をクリーンアップします。

3-11 Using the event loop

import asyncio
async def f():
    await asyncio.sleep(0)
    return 111
loop = asyncio.get_event_loop()
coro = f()
print(loop.run_until_complete(coro)) # コルーチンを完了するまで実行します。内部的には、.send(None)メソッドの呼び出しをすべて行っています。

出力結果

111