スクレイピングで大量のデータを取得するプログラムや機械学習のプログラムを作成すると、プログラムの内部では同じような処理を何度も繰り返しており、並列化させることで処理が高速化される可能性があります。
以前の投稿で紹介しましたが、我が家には6コア12スレッドのサーバーがあるので、このスペックをフルに生かすためにも並列化処理を組み込まない手はありません。
Linux(Ubuntu)自作PCのパーツ購入
ということで、pythonの並列化処理を試しに使ってみたので、備忘録として書いておきたいと思います。
concurrent.futuresモジュール
pythonで並列化処理を行うライブラリはいくつかあるようですが、python3.2で追加された標準モジュールであるconcurrent.futuresというモジュールを使うのが良さそうです。
公式ドキュメントはこちらになります。
Pythonの並列処理で注意すべき点(グローバルインタープリタロック)
Pythonの並列化処理の方法にはマルチスレッドとマルチプロセスの2種類があります。
マルチスレッドはCPUのスレッドを使用し処理を行い、マルチプロセスは別のサブプロセスを起動して処理します。したがって、理論上はマルチスレッドはCPUのスレッド数、マルチプロセスはCPUのコア数が並列化の上限となります。
私のサーバーは6コア、12スレッドなので、マルチプロセスを行う場合は6並列、マルチスレッドの場合は12並列まで可能という感じでしょう。
ただ、Pythonではマルチスレッドの際に、スレッドセーフ(グローバル変数の意図しない書き込みやDBのデッドロックを防ぐ)のためにグローバルインタープリタロック(GIL)が採用されているそうです。
スレッドセーフにしてくれているのはありがたいですが、その反面マルチスレッドの恩恵をあまり受けられないというデメリットがあるのは致し方有りません。
詳細はこちらのサイトを御覧ください。
並列化処理を実装してみる
詳細な説明は上記のサイトに委ねるとして、実際の実装例を示したいと思います。
関数の準備
並列処理を行うサンプルの関数を定義します。ここでは1〜Nまでの数字を足し合わせる関数です。
途中にtime.sleepを入れて0.1秒ストップする処理をしています。
実はこの程度の計算であれば、time.sleepを入れなければ、並列化処理しない(シングルプロセス)の方が早いです。(理由はサブプロセスなどを立ち上げるオーバーヘッドに時間を要するからだと思います。)
ただ、マルチスレッドとマルチプロセスの違いをわかりやすくするために入れていますので、その点ご留意ください。
def task(_n):
s = 0
for i in range(1,_n+1):
s+=i
time.sleep(0.1)
return s
シングルプロセス
まずは、通常のシングルプロセスで実行してみたいと思います。
時間を計測するために、time.time()を入れていますが計算をしている(後ほど並列化したい)箇所は8〜12行目の計算処理という部分になります。
import numpy as np
import time
ns = list(np.arange(1,11)) #1〜10までの数字のリストを作成
start = time.time() #処理開始時間
#========計算処理========
sms_single = []
for n in ns:
sms_single.append(task(n))
#=======================
end = time.time() #処理終了時間
delta = end - start #処理時間
print('処理時間:{}s'.format(round(delta,3)))
#結果
処理時間:5.534s
処理時間は約5.5秒でした。
マルチスレッド
次は、Pythonのマルチスレッディングを使用します。
上記の8〜12行目の箇所を下記のように変更します。
並列処理結果はretという変数に格納されます。retはgeneratorになっており[r for r in ret]とすることでlist形式にすることができます。
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor
ns = list(np.arange(1,11))
start = time.time()
#========計算処理========
with ThreadPoolExecutor(6) as e:
ret = e.map(task, ns)
sms_multi = [r for r in ret]
#=======================
end = time.time()
delta = end - start
print('処理時間:{}s'.format(round(delta,3)))
#スレッド数6の結果
処理時間:1.405s
処理時間は約1.4秒となり早くなっております。
これをスレッド数を12まで増やしたいと思います。上記コードの9行目の(6)の部分を(12)に変更します。
#スレッド数12の結果
処理時間:1.005s
1/2とはまではいきませんでしたが、処理時間はさらに早くなりました。
マルチプロセス
次に、Pythonのマルチプロセスを使用します。
マルチスレッドのThreadPoolExecutorをProcessPoolExecutorに書き換えるだけでOKです。
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor
start = time.time()
#========計算処理========
with ProcessPoolExecutor(6) as e:
ret = e.map(task, ns)
sms_multi = [r for r in ret]
#=======================
end = time.time()
delta = end - start
print('処理時間:{}s'.format(round(delta,3)))
#マルチプロセスの結果
処理時間:1.422s
処理時間は約1.4秒となりマルチスレッド(スレッド数6)の場合とほぼ同じでした。
関数に複数の引数を渡したいとき
上記のコードでは、引数が足し算したい数:Nのみでしたが、複数の引数を渡したい場合もあると思います。その場合は、一旦リストに引数を入れて渡すことになると思います。
例として、先程1〜Nまでの足し算だったのを、足し算の最初の値(from_)と最後の値(to_)を変えて見たいと思います。
関数を次のように定義します。
def task_from_to(args_):
from_, to_ = args_[0], args_[1]
s = 0
for i in range(from_, to_):
s+=i
time.sleep(0.1)
return s
引数は次のように与えます。from_は3〜12まで、to_は3の倍数です。
args = [[i+3,(i+1)*3] for i in range(10)]
print(args)
>>[[3, 3],
[4, 6],
[5, 9],
[6, 12],
[7, 15],
[8, 18],
[9, 21],
[10, 24],
[11, 27],
[12, 30]]
このリスト形式のargsを引数として、下記のように記述すれば複数の引数を渡すことができます。
start = time.time()
#========計算処理========
with ProcessPoolExecutor(12) as e:
ret = e.map(task_from_to, args)
sms_multi = [r for r in ret]
#=======================
end = time.time()
delta = end - start
print('処理時間:{}s'.format(round(delta,3)))
>>処理時間:1.833s
まとめ
今回は、pythonの標準モジュールであるconcurren.futuresのThreadPoolExecutorとProcessPoolExecutorを使ってみました。
同じような計算を大量に回る際にはプログラムの高速化につながると思います。
もっと詳しく知りたい方は以下の書籍が良いかと思います。