試験運用中なLinux備忘録・旧記事

はてなダイアリーで公開していた2007年5月-2015年3月の記事を保存しています。

Pythonを用いてマルチスレッド処理で外部プロセスをバックグラウンド実行する

Pythonで外部プロセスを起動して出力と戻り値を処理する」で扱ったsubprocessモジュールによる外部プロセスの起動のコードをもとに、外部プロセスをバックグラウンドで実行してみるテストを行った。
ここで言うバックグラウンド実行とは、プログラム中で処理を並行して実行する「マルチスレッド」を利用して実行することを示す。
なぉ、下に書いている例はチュートリアルのサンプルコードを参考にしている。
(2014/11/9)ドキュメントのリンク先を修正し、サンプルコードの内容もPython 3で動作するように修正した。

  1. threading.Threadオブジェクト
  2. threading.Threadを直接使用する場合
    1. threading.Threadの子クラスを作成する場合

threading.Threadオブジェクト

Python上でマルチスレッド処理を行うのに便利なのが、threadingモジュールThreadオブジェクトというもので、それぞれのスレッド(処理単位)に行わせるまとまりを作ってこれを指定するだけで、並行した処理を実現する。
これを使用するには、

  • threading.Threadオブジェクト(のインスタンス)を作成するときの引数*1として関数名とその引数を指定する
  • threading.Threadの子クラスを作成し、__init__()run()の2つのメンバ関数(メソッド)をオーバーライドする(独自に置き換えたバージョンにする)

の2つの方法がある(リファレンスの通り)。

threading.Threadを直接使用する場合

#! /usr/bin/python
# -*- coding: utf-8 -*-

from __future__ import print_function

import subprocess
import threading
import locale
import sys
import os

locale.setlocale (locale.LC_ALL, '')

def func (**dic):
  """
  スレッド内で実行される処理を記述した関数
  id  : 表示中の識別用番号
  cmd : 「コマンド + 引数」のリスト
  cwd : 作業ディレクトリ
  """
  id       = dic['id']
  argslist = dic['cmd']
  subproc_args = { 'stdin'     : subprocess.PIPE,
                   'stdout'    : subprocess.PIPE,
                   'stderr'    : subprocess.STDOUT,
                   'cwd'       : dic['cwd'],
                   'close_fds' : True,              }
  try:
    p = subprocess.Popen (argslist, **subproc_args)
  except OSError as e:
    print ('Failed to execute command "{0}": [{1}] {2}'.format (argslist[0], e.errno, e.strerror), file=sys.stderr)
    return
  (stdouterr, stdin) = (p.stdout, p.stdin)
  print ('-- output [{0}] begin --'.format (id))
  if sys.version_info.major == 3:
    while True:
      line = str (stdouterr.readline (), encoding='utf-8')
      #line = stdouterr.readline ().decode ('utf-8')  # decode()を用いる場合
      if not line:
        break
      print (line.rstrip ())
  else:
    while True:
      line = stdouterr.readline ()
      if not line:
        break
      print (line.rstrip ())
  print ('-- output [{0}] end --'.format (id))
  ret = p.wait ()
  print ('[{0}] Return code: {1}'.format (id, ret))

# 関数定義ここまで

os.environ['PATH'] = '/bin:/usr/bin'

cwd = '/'
# 5秒間停止するだけで何も出力しない
dic1 = { 'id'  : 1,
         'cmd' : ['sleep', '5'],
         'cwd' : cwd, }
# 毎秒数字を出力(後に終了)
dic2 = { 'id'  : 2,
         'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 2: $i"; sleep 1; done'],
         'cwd' : cwd, }
# 1秒間に2つの数字を出力(先に終了)
dic3 = { 'id'  : 3,
         'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 3: $i"; sleep 0.5; done'],
         'cwd' : cwd, }


thr1 = threading.Thread (target=func,kwargs=dic1)
print ('before thr1.start()')
thr1.start ()  # 開始
print ('after thr1.start()')
print ('before thr1.join() waiting...')
thr1.join ()  # thr1が終了するのを待ち、一旦止まる
print ('after thr1.join()')

# 以下、2つのプロセスを続けてバックグラウンド実行するテスト
thr2 = threading.Thread (target=func,kwargs=dic2)
print ('before thr2.start()')
thr2.start ()
print ('after thr2.start()')
thr3 = threading.Thread (target=func,kwargs=dic3)
print ('before thr3.start()')
thr3.start ()
print ('after thr3.start()')
  • 「target=[関数名],args=([引数をタプル型にして並べる])」を引数に渡してインスタンスを生成する
  • メンバ関数start()を呼び出したところから処理が開始される
  • メンバ関数join()を呼び出すと、そのスレッドが終了するまで待ち、先の処理に進まない*2

(2008/4/17)kwargs指定により、下の子クラスを使用した例の形に合わせて引数を辞書から渡すように修正

threading.Threadの子クラスを作成する場合

#! /usr/bin/python
# -*- coding: utf-8 -*-

from __future__ import print_function

import subprocess
import threading
import locale
import sys
import os

locale.setlocale (locale.LC_ALL, '')

class ExecuteBackground (threading.Thread):  # threading.Threadを継承
  """
  プロセスをバックグラウンドで実行する
  Threadクラスの子クラスとして定義(__init__()とrun()をオーバーライドして使用)
  threading.Thread.__init__(self)の呼び出しは必須
  """
  def __init__ (self, **dic):
    """
    オブジェクトの初期化
    """
    threading.Thread.__init__ (self)  # 必ず呼び出す
    self._id = dic['id']
    self._args = dic['cmd']
    self._subproc_args = { 'stdin'     : subprocess.PIPE,
                           'stdout'    : subprocess.PIPE,
                           'stderr'    : subprocess.STDOUT,
                           'cwd'       : dic['cwd'],
                           'close_fds' : True,              }

  def run (self):
    """
    スレッド内で行われる処理を記述
    """
    try:
      p = subprocess.Popen (self._args, **self._subproc_args)
    except OSError as e:
      print ('Failed to execute command "{0}": [{1}] {2}'.format (self._args[0], e.errno, e.strerror), file=sys.stderr)
      return
    (stdouterr, stdin) = (p.stdout, p.stdin)
    print ('-- output [{0}] begin --'.format (self._id))
    if sys.version_info.major == 3:
      while True:
        line = str (stdouterr.readline (), encoding='utf-8')
        #line = stdouterr.readline ().decode ('utf-8')  # decode()を用いる場合
        if not line:
          break
        print (line.rstrip ())
    else:
      while True:
        line = stdouterr.readline ()
        if not line:
          break
        print (line.rstrip ())
    print ('-- output [{0}] end --'.format (self._id))
    ret = p.wait ()
    print ('[{0}] Return code: {1}'.format (self._id, ret))

# クラス定義ここまで

os.environ['PATH'] = '/bin:/usr/bin'

cwd = '/'
# 5秒間停止するだけで何も出力しない
dic1 = { 'id'  : 1,
         'cmd' : ['sleep', '5'],
         'cwd' : cwd, }
# 毎秒数字を出力(後に終了)
dic2 = { 'id'  : 2,
         'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 2: $i"; sleep 1; done'],
         'cwd' : cwd, }
# 1秒間に2つの数字を出力(先に終了)
dic3 = { 'id'  : 3,
         'cmd' : ['bash', '-c', 'for ((i = 1; i <= 5; i++)); do echo "Thread 3: $i"; sleep 0.5; done'],
         'cwd' : cwd, }


cmd1 = ExecuteBackground (**dic1)
print ('before cmd1.start()')
cmd1.start ()  # run()を呼び出す
print ('after cmd1.start()')
print ('before cmd1.join() waiting...')
cmd1.join ()
print ('after cmd1.join()')

cmd2 = ExecuteBackground (**dic2)
print ('before cmd2.start()')
cmd2.start ()
print ('after cmd2.start()')
cmd3 = ExecuteBackground (**dic3)
print ('before cmd3.start()')
cmd3.start ()
print ('after cmd3.start()')
  • コンストラク__init__()で必ずthreading.Thread.__init__(self)を実行する
  • メンバ関数run()*3の中に処理を記述
  • __init__()run()以外のメンバ関数はオーバーライドしてはいけない

*1:コンストラクタ(__init__())に渡される

*2:待ち時間の上限(タイムアウト時間)をこのメンバ関数の引数に指定することも可能で、時間切れになったら先の処理に進むことになる

*3:この名前でなければならない