試験運用中なLinux備忘録・旧記事

はてなダイアリーで公開していた2007年5月-2015年3月の記事を保存しています。

ドメインの名前解決チェックを行うGUIツールをPyGTKで作成

Pythonでドメイン名からIPv4アドレスを得る」では引数に指定された各ドメインに対して名前解決を試みてそれぞれの結果を出力するPythonスクリプトを作成したが、今回はテキストビューに入力されたドメインを行ごとに読み取って成功したものと失敗したものに分け、更にドメイン名の後ろに「/」が付くものは名前解決を試みる前に飛ばしてこれも別に分けるようなツールを作成した。
(2009/4/22)下のコードは無駄にマルチスレッドを使用しているため、「PyGTKで重い処理をしているときにGUIを固まらせないための手法をまとめる」にシングルスレッド版を貼り付けた。

コード

[任意]ファイル名: filter2.py ライセンス: GPL-3

#! /usr/bin/python
# -*- encoding: utf-8 -*-

# Filter2 20090410 (C) 2009 kakurasan
# Licensed under GPL-3


import threading
import socket
import time
import sys
try:
  import pygtk
  pygtk.require('2.0')
except:
  pass
try:
  import pango
  import gtk
except:
  print >> sys.stderr, 'Error: PyGTK is not installed'
  sys.exit(1)
try:
  from glib import timeout_add as glib_timeout_add
  from glib import source_remove as glib_source_remove
except:
  try:
    from gobject import timeout_add as glib_timeout_add
    from gobject import source_remove as glib_source_remove
  except:
    print >> sys.stderr, 'Error: cannot import GLib functions'
    sys.exit(1)


class MainWindow(gtk.Window):
  """
  メインウィンドウ
  """
  def __init__(self, *args, **kwargs):
    gtk.Window.__init__(self, *args, **kwargs)
    # ショートカットキー(アクセラレータ)
    self.accelgroup = gtk.AccelGroup()
    self.add_accel_group(self.accelgroup)
    # メニュー項目
    self.item_quit = gtk.ImageMenuItem(gtk.STOCK_QUIT, self.accelgroup)
    self.menu_file = gtk.Menu()
    self.menu_file.add(self.item_quit)
    self.item_file = gtk.MenuItem('_File', True)
    self.item_file.set_submenu(self.menu_file)
    self.menubar = gtk.MenuBar()
    self.menubar.append(self.item_file)
    # 部品
    self.textview_input = gtk.TextView()
    self.textview_found = gtk.TextView()
    self.textview_notfound = gtk.TextView()
    self.textview_skipped = gtk.TextView()
    for textview in (self.textview_input, self.textview_found, self.textview_notfound, self.textview_skipped):
      textview.set_wrap_mode(gtk.WRAP_CHAR)
    self.textbuf_input = self.textview_input.get_buffer()
    self.textbuf_found = self.textview_found.get_buffer()
    self.textbuf_notfound = self.textview_notfound.get_buffer()
    self.textbuf_skipped = self.textview_skipped.get_buffer()
    self.txttag_active = gtk.TextTag()
    self.txttag_active.set_property('foreground', 'blue')
    self.txttag_active.set_property('weight', pango.WEIGHT_BOLD)
    self.textbuf_input.get_tag_table().add(self.txttag_active)
    self.id_textbuf_input = self.textbuf_input.connect('changed', self.on_textbuf_changed)
    self.sw_input = gtk.ScrolledWindow()
    self.sw_found = gtk.ScrolledWindow()
    self.sw_notfound = gtk.ScrolledWindow()
    self.sw_skipped = gtk.ScrolledWindow()
    for sw in (self.sw_input, self.sw_found, self.sw_notfound, self.sw_skipped):
      sw.set_policy(gtk.POLICY_AUTOMATIC, gtk.POLICY_AUTOMATIC)
    self.sw_input.add(self.textview_input)
    self.sw_found.add(self.textview_found)
    self.sw_notfound.add(self.textview_notfound)
    self.sw_skipped.add(self.textview_skipped)
    self.button_start = gtk.Button(label='_Check')
    self.button_start.set_image(gtk.image_new_from_stock(gtk.STOCK_EXECUTE, gtk.ICON_SIZE_BUTTON))
    self.button_stop = gtk.Button(stock=gtk.STOCK_STOP)
    self.button_stop.set_sensitive(False)
    self.pgbar = gtk.ProgressBar()
    self.statusbar = gtk.Statusbar()
    self.cxtid_linecnt = self.statusbar.get_context_id('linecount')
    self.cxtid_status = self.statusbar.get_context_id('status')
    self.msgid_linecnt = None
    self.msgid_status = None
    # フレーム(説明のために使用・中身はスクロールウィンドウ)
    self.frame_input = gtk.Frame('Input')
    self.frame_input.add(self.sw_input)
    self.frame_found = gtk.Frame('OK')
    self.frame_found.add(self.sw_found)
    self.frame_notfound = gtk.Frame('NG')
    self.frame_notfound.add(self.sw_notfound)
    self.frame_notfound.set_size_request(-1, 100)
    self.frame_skipped = gtk.Frame('Skipped')
    self.frame_skipped.add(self.sw_skipped)
    self.frame_skipped.set_size_request(-1, 60)
    # レイアウト用コンテナ
    self.vbox_output = gtk.VBox()  # 中央右側の縦分割
    self.vbox_output.pack_start(self.frame_found)
    self.vbox_output.pack_start(self.frame_notfound)
    self.vbox_output.pack_start(self.frame_skipped)
    self.hbox_btn = gtk.HBox()  # 下の横分割
    self.hbox_btn.pack_start(self.button_start, expand=False, fill=False)
    self.hbox_btn.pack_start(self.button_stop, expand=False, fill=False)
    self.hbox_btn.pack_start(self.pgbar)
    self.hbox_textview = gtk.HBox()  # 中央の横分割
    self.hbox_textview.pack_start(self.frame_input)
    self.hbox_textview.pack_start(self.vbox_output)
    self.vbox = gtk.VBox()  # 全体の縦分割
    self.vbox.pack_start(self.menubar, expand=False, fill=False)
    self.vbox.pack_start(self.hbox_textview)
    self.vbox.pack_start(self.hbox_btn, expand=False, fill=False)
    self.vbox.pack_start(self.statusbar, expand=False, fill=False)
    # シグナル
    self.connect('delete_event', gtk.main_quit)
    self.item_quit.connect('activate', gtk.main_quit)
    self.button_start.connect('clicked', self.on_button_start_clicked)
    self.button_stop.connect('clicked', self.on_button_stop_clicked)
    # ウィンドウ
    self.add(self.vbox)
    self.set_size_request(350, 400)
    self.resize(400, 450)
  def to_update_progressbar(self):
    """
    処理中にのみ定期的に実行される関数
    プログレスバーを更新
    """
    fraction = float(self.domains_processed) / float(self.domains)
    self.pgbar.set_text('%d / %d' % (self.domains_processed, self.domains))
    self.pgbar.set_fraction(fraction)
    return True
  def th_resolve(self):
    """
    別スレッドで実行する関数
    指定ドメインの名前解決を試みて結果をテキストバッファに出力
    処理の前後にはGUI部品の状態などを設定
    """
    gtk.gdk.threads_enter()
    # ステータスバー
    if self.msgid_status:
      self.statusbar.remove(self.cxtid_status, self.msgid_status)
    self.msgid_status = self.statusbar.push(self.cxtid_status, 'Processing...')
    # 行数カウントによるテキスト更新を一時的にブロック
    self.textbuf_input.handler_block(self.id_textbuf_input)
    # 出力先のテキストバッファを空にする
    for widget in (self.textbuf_found, self.textbuf_notfound, self.textbuf_notfound, self.textbuf_skipped):
      widget.set_text('')
    # 停止ボタンを押せるようにする
    self.button_stop.set_sensitive(True)
    # テキストビューの編集を禁止し、開始ボタンも押せないようにする
    for widget in (self.button_start, self.textview_input, self.textview_found, self.textview_notfound, self.textview_skipped):
      widget.set_sensitive(False)
    # プログレスバー更新のタイムアウト関数を設定
    tag = glib_timeout_add(150, self.to_update_progressbar)
    # テキストビューの各行のドメイン名について名前解決を試みて
    # 処理した項目は強調し(スタイルを付け)ていく
    iter_start = self.textbuf_input.get_start_iter()
    self.domains = self.textbuf_input.get_line_count()
    gtk.gdk.threads_leave()
    self.domains_processed = 0

    # 項目別にループで処理を行う
    while True:
      gtk.gdk.threads_enter()
      iter_end = iter_start.copy()    # 位置情報をコピーして
      iter_end.forward_to_line_end()  # 行末(終点)に持っていく
      domain = self.textbuf_input.get_text(iter_start, iter_end)
      self.textbuf_input.place_cursor(iter_end)
      self.textbuf_input.apply_tag(self.txttag_active, iter_start, iter_end)
      self.textview_input.scroll_to_iter(iter_end, 0)
      gtk.gdk.threads_leave()
      if '/' in domain:  # ドメインの後ろにディレクトリを含むものは飛ばす
        gtk.gdk.threads_enter()
        iter_end_skipped = self.textbuf_skipped.get_end_iter()
        self.textbuf_skipped.place_cursor(self.textbuf_skipped.get_end_iter())
        self.textbuf_skipped.insert_at_cursor(domain + '\n')
        self.textview_skipped.scroll_to_mark(self.textbuf_skipped.get_insert(), 0)
        gtk.gdk.threads_leave()
      else:
        # 名前解決を試行し、結果に応じたテキストバッファに追加
        try:
          socket.gethostbyname(domain)
        except socket.gaierror:  # 失敗
          gtk.gdk.threads_enter()
          iter_end_notfound = self.textbuf_notfound.get_end_iter()
          self.textbuf_notfound.place_cursor(self.textbuf_notfound.get_end_iter())
          self.textbuf_notfound.insert_at_cursor(domain + '\n')
          self.textview_notfound.scroll_to_mark(self.textbuf_notfound.get_insert(), 0)
          gtk.gdk.threads_leave()
        else:                    # 成功
          gtk.gdk.threads_enter()
          self.textbuf_found.place_cursor(self.textbuf_found.get_end_iter())
          self.textbuf_found.insert_at_cursor(domain + '\n')
          self.textview_found.scroll_to_mark(self.textbuf_found.get_insert(), 0)
          gtk.gdk.threads_leave()
      self.domains_processed += 1
      gtk.gdk.threads_enter()
      # 次の行の先頭へ移動し、途中に終わりがあれば抜ける
      forward_line = iter_start.forward_line()
      gtk.gdk.threads_leave()
      if not forward_line:
        break
      # 次の名前解決処理までに少し間隔を空ける
      for _ in range(10):  # 0.2秒を10回、合計2秒ずつとする
        # 停止ボタンが押された場合はsleepを中断する
        if self.stop_requested:
          gtk.gdk.threads_enter()
          glib_source_remove(tag)  # プログレスバーの更新を終了
          self.statusbar.remove(self.cxtid_status, self.msgid_status)
          self.msgid_status = self.statusbar.push(self.cxtid_status, 'Stopped')
          self.button_stop.set_sensitive(False)
          for widget in (self.button_start, self.textview_input, self.textview_found, self.textview_notfound, self.textview_skipped):
            widget.set_sensitive(True)
          self.textbuf_input.handler_unblock(self.id_textbuf_input)
          self.pgbar.set_fraction(0.0)
          self.pgbar.set_text('')
          self.textbuf_input.remove_tag(self.txttag_active, self.textbuf_input.get_start_iter(), self.textbuf_input.get_end_iter())
          gtk.gdk.threads_leave()
          return
        time.sleep(0.2)

    # 各種後始末
    gtk.gdk.threads_enter()
    glib_source_remove(tag)
    self.statusbar.remove(self.cxtid_status, self.msgid_status)
    self.msgid_status = self.statusbar.push(self.cxtid_status, 'Done')
    self.pgbar.set_text('Total:%d OK:%d NG:%d Skipped:%d' % (self.domains, self.textbuf_found.get_line_count() - 1, self.textbuf_notfound.get_line_count() - 1, self.textbuf_skipped.get_line_count() - 1))
    self.pgbar.set_fraction(1.0)
    self.button_stop.set_sensitive(False)
    for widget in (self.button_start, self.textview_input, self.textview_found, self.textview_notfound, self.textview_skipped):
      widget.set_sensitive(True)
    self.textbuf_input.handler_unblock(self.id_textbuf_input)
    self.textbuf_input.remove_tag(self.txttag_active, self.textbuf_input.get_start_iter(), self.textbuf_input.get_end_iter())
    gtk.gdk.threads_leave()
  def on_button_start_clicked(self, widget):
    """
    開始ボタンを押したときの処理
    """
    if self.textbuf_input.get_char_count() > 0:  # 完全に空のときは処理しない
      # stop_requestedは停止ボタンが押されたかどうかを示す真偽値で
      # 別スレッド側でこの値を監視しながらループする
      self.stop_requested = False
      t = threading.Thread(target=self.th_resolve)
      t.start()
  def on_button_stop_clicked(self, widget):
    """
    停止ボタンを押したときの処理
    """
    self.stop_requested = True
  def on_textbuf_changed(self, widget):
    """
    テキストバッファが変更されたときの処理
    """
    # ステータスバーにドメイン数(テキストバッファの行数)を表示
    if self.msgid_linecnt:
      # 前回の文字列があれば消す
      self.statusbar.remove(self.cxtid_linecnt, self.msgid_linecnt)
    self.msgid_linecnt = self.statusbar.push(self.cxtid_linecnt, '%d Domain(s)' % self.textbuf_input.get_line_count())

class Filter2:
  """
  フィルタをフィルタするツール
  """
  def main(self):
    """
    アプリケーションのメイン処理
    """
    win = MainWindow()
    win.show_all()
    gtk.gdk.threads_init()
    gtk.main()


if __name__ == '__main__':
  app = Filter2()
  app.main()

使い方

左側の「Input」のテキストビューにドメイン名を1行に1つずつ並べていって

下の「Check」ボタンを押すと処理が始まり

  • 名前解決に成功したら右上の「OK」のテキストビュー
  • 失敗したら右の真ん中にある「NG」のテキストビュー
  • 「/」を含む項目は右下の「Skipped」のテキストビュー

へそれぞれ振り分けられる。1つのドメインを処理するごとに約2秒間待つようになっていて、途中で停止ボタンを押すと処理を停止することもできる。

処理中は「Input」のテキストビューの処理済み項目が色付けされる。

処理が終了すると統計を出力する。

関連記事: