Pythonを用いてマルチスレッド処理で外部プロセスをバックグラウンド実行する
「Pythonで外部プロセスを起動して出力と戻り値を処理する」で扱ったsubprocessモジュールによる外部プロセスの起動のコードをもとに、外部プロセスをバックグラウンドで実行してみるテストを行った。
ここで言うバックグラウンド実行とは、プログラム中で処理を並行して実行する「マルチスレッド」を利用して実行することを示す。
なぉ、下に書いている例はチュートリアルのサンプルコードを参考にしている。
(2014/11/9)ドキュメントのリンク先を修正し、サンプルコードの内容もPython 3で動作するように修正した。
threading.Threadオブジェクト
Python上でマルチスレッド処理を行うのに便利なのが、threadingモジュールのThreadオブジェクトというもので、それぞれのスレッド(処理単位)に行わせるまとまりを作ってこれを指定するだけで、並行した処理を実現する。これを使用するには、
- threading.Threadオブジェクト(のインスタンス)を作成するときの引数*1として関数名とその引数を指定する
- threading.Threadの子クラスを作成し、__init__()とrun()の2つのメンバ関数(メソッド)をオーバーライドする(独自に置き換えたバージョンにする)
の2つの方法がある(リファレンスの通り)。
threading.Threadを直接使用する場合
#! /usr/bin/python # -*- coding: utf-8 -*- from __future__ import print_function import subprocess import threading import locale import sys import os locale.setlocale (locale.LC_ALL, '') def func (**dic): """ スレッド内で実行される処理を記述した関数 id : 表示中の識別用番号 cmd : 「コマンド + 引数」のリスト cwd : 作業ディレクトリ """ id = dic['id'] argslist = dic['cmd'] subproc_args = { 'stdin' : subprocess.PIPE, 'stdout' : subprocess.PIPE, 'stderr' : subprocess.STDOUT, 'cwd' : dic['cwd'], 'close_fds' : True, } try: p = subprocess.Popen (argslist, **subproc_args) except OSError as e: print ('Failed to execute command "{0}": [{1}] {2}'.format (argslist[0], e.errno, e.strerror), file=sys.stderr) return (stdouterr, stdin) = (p.stdout, p.stdin) print ('-- output [{0}] begin --'.format (id)) if sys.version_info.major == 3: while True: line = str (stdouterr.readline (), encoding='utf-8') #line = stdouterr.readline ().decode ('utf-8') # decode()を用いる場合 if not line: break print (line.rstrip ()) else: while True: line = stdouterr.readline () if not line: break print (line.rstrip ()) print ('-- output [{0}] end --'.format (id)) ret = p.wait () print ('[{0}] Return code: {1}'.format (id, ret)) # 関数定義ここまで os.environ['PATH'] = '/bin:/usr/bin' cwd = '/' # 5秒間停止するだけで何も出力しない dic1 = { 'id' : 1, 'cmd' : ['sleep', '5'], 'cwd' : cwd, } # 毎秒数字を出力(後に終了) dic2 = { 'id' : 2, 'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 2: $i"; sleep 1; done'], 'cwd' : cwd, } # 1秒間に2つの数字を出力(先に終了) dic3 = { 'id' : 3, 'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 3: $i"; sleep 0.5; done'], 'cwd' : cwd, } thr1 = threading.Thread (target=func,kwargs=dic1) print ('before thr1.start()') thr1.start () # 開始 print ('after thr1.start()') print ('before thr1.join() waiting...') thr1.join () # thr1が終了するのを待ち、一旦止まる print ('after thr1.join()') # 以下、2つのプロセスを続けてバックグラウンド実行するテスト thr2 = threading.Thread (target=func,kwargs=dic2) print ('before thr2.start()') thr2.start () print ('after thr2.start()') thr3 = threading.Thread (target=func,kwargs=dic3) print ('before thr3.start()') thr3.start () print ('after thr3.start()')
- 「target=[関数名],args=([引数をタプル型にして並べる])」を引数に渡してインスタンスを生成する
- メンバ関数start()を呼び出したところから処理が開始される
- メンバ関数join()を呼び出すと、そのスレッドが終了するまで待ち、先の処理に進まない*2
(2008/4/17)kwargs指定により、下の子クラスを使用した例の形に合わせて引数を辞書から渡すように修正
threading.Threadの子クラスを作成する場合
#! /usr/bin/python # -*- coding: utf-8 -*- from __future__ import print_function import subprocess import threading import locale import sys import os locale.setlocale (locale.LC_ALL, '') class ExecuteBackground (threading.Thread): # threading.Threadを継承 """ プロセスをバックグラウンドで実行する Threadクラスの子クラスとして定義(__init__()とrun()をオーバーライドして使用) threading.Thread.__init__(self)の呼び出しは必須 """ def __init__ (self, **dic): """ オブジェクトの初期化 """ threading.Thread.__init__ (self) # 必ず呼び出す self._id = dic['id'] self._args = dic['cmd'] self._subproc_args = { 'stdin' : subprocess.PIPE, 'stdout' : subprocess.PIPE, 'stderr' : subprocess.STDOUT, 'cwd' : dic['cwd'], 'close_fds' : True, } def run (self): """ スレッド内で行われる処理を記述 """ try: p = subprocess.Popen (self._args, **self._subproc_args) except OSError as e: print ('Failed to execute command "{0}": [{1}] {2}'.format (self._args[0], e.errno, e.strerror), file=sys.stderr) return (stdouterr, stdin) = (p.stdout, p.stdin) print ('-- output [{0}] begin --'.format (self._id)) if sys.version_info.major == 3: while True: line = str (stdouterr.readline (), encoding='utf-8') #line = stdouterr.readline ().decode ('utf-8') # decode()を用いる場合 if not line: break print (line.rstrip ()) else: while True: line = stdouterr.readline () if not line: break print (line.rstrip ()) print ('-- output [{0}] end --'.format (self._id)) ret = p.wait () print ('[{0}] Return code: {1}'.format (self._id, ret)) # クラス定義ここまで os.environ['PATH'] = '/bin:/usr/bin' cwd = '/' # 5秒間停止するだけで何も出力しない dic1 = { 'id' : 1, 'cmd' : ['sleep', '5'], 'cwd' : cwd, } # 毎秒数字を出力(後に終了) dic2 = { 'id' : 2, 'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 2: $i"; sleep 1; done'], 'cwd' : cwd, } # 1秒間に2つの数字を出力(先に終了) dic3 = { 'id' : 3, 'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 3: $i"; sleep 0.5; done'], 'cwd' : cwd, } cmd1 = ExecuteBackground (**dic1) print ('before cmd1.start()') cmd1.start () # run()を呼び出す print ('after cmd1.start()') print ('before cmd1.join() waiting...') cmd1.join () print ('after cmd1.join()') cmd2 = ExecuteBackground (**dic2) print ('before cmd2.start()') cmd2.start () print ('after cmd2.start()') cmd3 = ExecuteBackground (**dic3) print ('before cmd3.start()') cmd3.start () print ('after cmd3.start()')