asyncio なんもわからんから オライリー買って抜粋和訳してみた part 1 ~序章から Quickstart ~

tl;dr

asyncio なんもわからんからこれを買って、私にとって重要そうな場所のみを抜粋和訳してみたよ!

序章

スレッドと asyncio の違いや、良さ悪さ、使い所について書いてある。

私の理解だと、複数の似たようなタスクがあり、ただしCPUを消費するものではなく大半は待機時間のような、ネットワークを介した処理は asyncio が向いている。

例えば、request.get(URL)みたいな処理を 10000 回繰り返したい場合、get してからその結果が帰ってくるまではデータのダウンロード時間が大半であり、CPUはアイドル状態である。 1 サイト 1 秒ダウンロードかかる場合は 10000 秒かかってしまうわけだが、CPUはその間ほとんど遊んでしまっている。 その場合、回線などの問題はさておき、10000 回分の get を同時に実行すれば早くなる。 asyncio を使わないとマルチスレッドで動かせばできるが、スレッドはオーバヘッドが大きく、スレッドの数はそんなに増やせない (10 個くらいとか?)。 そんなときは asyncio でイベントドリブンな処理にすると(例えば)シングルスレッドで get を非同期に実行し、結果が帰ってきたら次の処理をトリガーのように仕込むことをすれば、 効率よく処理が回せる。

そんなことを言っている。気がする。

(正直たとえ話が長くて読むのが飽きた)

Quickstart

  • 7 つの関数を知っておくだけで、通常利用に限って asyncio を使うことができる
  • asyncio はフレームワーク設計者向けであるが、エンドユーザが知っておくべきなのは以下にまとめられる
    • asyncio のイベントループの開始
    • async/await 関数の呼び出し
    • イベントループ上で実行するタスクの作成
    • 同時に実行しているタスクの完了待機
    • 同時実行タスクが完了後のイベントループのクローズ これらのコアの機能を見て、 Python のイベントベースのプログラミングを使ってループを実行する方法を見る。

asyncio の hello world はこんなコード(私が多少改変)

3-1 quickstart.py

import asyncio, datetime

async def main():
    print(datetime.datetime.utcnow(),end=' : ')
    print('Hello!')
    await asyncio.sleep(1.0)
    print(datetime.datetime.utcnow(),end=' : ')
    print('GoodBye!')

asyncio.run(main())

実行結果

2020-12-30 02:42:22.451943 : Hello!
2020-12-30 02:42:23.453100 : GoodBye!

asynciorun() を提供しており、 async def で定義された関数や他のコルーチンを呼び出すことができる。 コルーチンについてはこちらの公式ドキュメントに記載があるが、 async 構文で定義された関数と思っていてよさそう。

hello の後、1 秒間の sleep を経て GoodBye が表示されていることを確認できた。

asyncio.run() が何をするものなのか、というのは高レベル API なので、実際にどんなことをやっているのかを、完全には等価ではないが、イメージのために記載すると、 3-2 のような処理を行っている。 3-2 のコードは本の残りの部分を読んでいく上での考え方を紹介するのに十分近いものである。

3-2 quickstart.py

import asyncio, datetime

async def main():
    print(datetime.datetime.utcnow(),end=' : ')
    print('Hello!')
    await asyncio.sleep(1.0)
    print(datetime.datetime.utcnow(),end=' : ')
    print('GoodBye!')

# asyncio.run(main())

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)
pending = asyncio.all_tasks(loop=loop)
for task in pending:
    task.cancel()
group = asyncio.gather(*pending, return_exceptions=True)
loop.run_until_complete(group)
loop.close()

実行結果

2020-12-30 02:53:14.428308 : Hello!
2020-12-30 02:53:15.428343 : GoodBye!

同じように Hello の 1 秒後に GoodBye が表示されているのが確認できる。

ここから各処理を見ていく。

loop = asyncio.get_event_loop()

どのコルーチンを実行するにも、 loop インスタンスが必要。1 つのスレッドだけを使用している限り、get_event_loop() は、どこでも毎回同じループインスタンスを取得できます。 もし async def で定義した関数の中からループインスタンスを取得したい場合は、代わりに asyncio.get_running_loop() を呼び出すべき。後ほど解説します。

task = loop.create_task(coro)

これを実行するまでコルーチンは実行されません。返されたタスクオブジェクトは、タスクの状態(例えばタスクがまだ実行中なのか、完了したのか)を監視するために使用することができ、完了したコルーチンから返り値を取得するためにも使用できます。タスクをキャンセルするには task.cancel() を使用します。

※coro はコルーチンを指している

loop.run_until_complete(coro)

run_until_complete() をコールすると、現在のスレッドをブロックします。run_until_complete() は与えられたコルーチンが完了するまでループを実行し続けることに注意してください。 ループ上でスケジュールされている他のすべてのタスクは、ループが実行されている間も実行されます。

group = asyncio.gather(task1,task2,task3,…)

プロセスシグナルを受信したか、 あるいは loop.stop() でループが停止したかで、プログラムの main 部分がアンブロックされると、 run_until_complete() の後のコードが実行されます。 ここで示している標準的な文法は、保留中のタスクを集めてキャンセルし、それらのタスクが完了するまで loop.run_until_complete() を再び使用するというものです。asyncio.run() はキャンセル、ギャザリング、保留中のタスクの終了待ちを全て行うことに注意してください。

loop.close()

loop.close() は通常最後に行うもので、停止した loop インスタンスで呼び出されなければならず、すべてのキューをクリアして Executor をシャットダウンします。停止した loop は再開できますが、 close した場合は消えます(再開できない)

3-1 では asyncio.run() を使用すれば、これらのステップはどれも必要ないことを示していますが、これらのステップを理解することは重要です。なぜなら実際はもっと複雑な状況が出るので、それらに対処するための知識が必要だからです。

基本的な機能として、最後に知るべきはブロッキング関数を実行する方法です。協調的なマルチタスクについては全てのIOバウント関数が必要です。 asyncio は concurrent.future パッケージの API と非常によく似た API を提供しています。このパッケージは ThreadPoolExecutor と ProcessPoolExecutor を提供します。デフォルトはスレッドベースですが、プールベースも選択可能です。先程の例では Executor を省略しましたが、今回は見てみましょう。

3-3 The Basic executor interface

import asyncio, datetime
from time import sleep

async def main():
    print(datetime.datetime.utcnow(),end=' : ')
    print('Hello!')
    await asyncio.sleep(3.0)
    print(datetime.datetime.utcnow(),end=' : ')
    print('GoodBye!')

def blocking():
    print('Start Blocking!')
    sleep(0.5)
    print('GoodBye Blocking!')

loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_in_executor(None, blocking)
loop.run_until_complete(task)

pending = asyncio.all_tasks(loop=loop)
for task in pending:
    task.cancel()
group = asyncio.gather(*pending, return_exceptions=True)
loop.run_until_complete(group)
loop.close()

実行結果

Start Blocking!
2020-12-30 05:08:40.774532 : Hello!
GoodBye Blocking!
2020-12-30 05:08:43.777301 : GoodBye!

blocking() は従来の(asyncio ではない) sleep() を内部的に呼び出しています。これはこの関数をコルーチンにしてはいけないということを意味します。この問題を解決するには、この関数を Executor で実行する必要があります。このセクションとは関係ないが、blocking の中で実行している sleep より main で実行している sleep (非 blocking ) のほうが長いことに注意してください。blocking の sleep のほうが長い場合、run_until_complete(main())により、main が終わったら次の処理に行き、blocking が途中にも関わらず loop が close されてしまうため、エラーが発生します。つまり、run_in_executor( はメインスレッドをブロックするわけではないということです。

今までasyncio の本質的な部分を見てきましたが、次に範囲を広げて、asyncio API を階層に整理してみましょう。

~つづく(はず)~