試験運用中なLinux備忘録・旧記事

はてなダイアリーで公開していた2007年5月-2015年3月の記事を保存しています。

GnashでYouTube動画を再生するスクリプトをPythonに移植

Gnashを使用して、指定されたYouTubeのURLをもとに動画を再生するスクリプトを以前作成し、サーバ側のHTMLコード(JavaScript部分)の変更にともない、数回変更をしている。
ここでは、「Gnash 0.8.2とebuild、動作について」に貼り付けているバージョンをPythonで書いてみた。
[任意]ファイル名: gnash-youtube-20080516

#! /usr/bin/python
# -*- encoding: utf-8 -*-

import subprocess
import urllib
import time
import sys
import os

class ExecuteForeground:
  """
  外部プロセスをフォアグラウンドで実行する
  """
  def __init__(self, **dic):
    self.argslist = dic["cmd"]
    self.subproc_args = { "stdin"     : subprocess.PIPE,
                          "stdout"    : subprocess.PIPE,
                          "stderr"    : subprocess.STDOUT,
                          "cwd"       : dic["workdir"],
                          "close_fds" : True,              }
  def run(self):
    """
    外部プロセスの実行処理
    """
    try:
      p = subprocess.Popen(self.argslist, **self.subproc_args)
    except OSError:
      print u"コマンドの実行に失敗しました: %s".encode("utf-8") % self.argslist[0]
      return False
    (stdouterr, stdin) = (p.stdout, p.stdin)
    while True:
      line = stdouterr.readline()
      if not line:
        break
      print "%s" % line.rstrip()
    p.wait()

class CachePlayer:
  """
  プレーヤのswfファイルをローカルにキャッシュ
  ローカル/リモートのタイムスタンプを確認して
  リモートのほうが新しい場合にダウンロード
  """
  def __init__(self):
    self.player_url = "http://jp.youtube.com/player2.swf"
    self.player_local = "/var/tmp/%s" % os.path.basename(self.player_url)
    ## プレーヤのswfファイルの最終更新日時を確認
    try:
      f = urllib.urlopen(self.player_url)
    except IOError:
      print u"エラー: URL \"%s\" が開けませんでした".encode("utf-8") % self.player_url
      sys.exit(1)
    f.close()
    for l in str(f.info()).splitlines():
      if not l:
        print u"エラー: URL \"%s\" の最終更新日時が取得できませんでした".encode("utf-8") % self.player_url
        sys.exit(1)
      if "Last-Modified:" in l:
        ## 最終更新日時をローカルの経過時刻形式に変換
        try:
          self.timestamp = time.mktime(time.strptime(l, "Last-Modified: %a, %d %b %Y %H:%M:%S GMT")) - time.timezone
        except ValueError:
          print u"エラー: URL \"%s\" の最終更新日時の解析に失敗しました".encode("utf-8") % self.player_url
          sys.exit(1)
  def is_download_needed(self):
    """
    サーバ上のswfファイルが手元のものより新しい場合にTrue、そうでなければFalse
    手元にswfファイルがない場合にもFalse
    サーバ上のタイムスタンプをローカルのファイルに保存するため
    タイムスタンプが同じであれば最新とみなす
    """
    try:
      st = os.stat(self.player_local)
      if st.st_mtime == self.timestamp:
        return False
    except OSError:
      return False
    return True
  def do_download(self):
    """
    player2.swfをダウンロードしてタイムスタンプをサーバ上のものに合わせる
    """
    #print "Updating player..."
    opener = urllib.FancyURLopener({})
    try:
      (filename, headers) = opener.retrieve(self.player_url, self.player_local)
    except IOError:
      print u"\"%s\" の取得に失敗しました".encode("utf-8") % self.player_url
      sys.exit(1)
    ## 最終更新日時をローカル上のファイルのタイムスタンプとしてセットする
    for l in str(headers).splitlines():
      if "Last-Modified: " in l:
        try:
          ## Last-Modified: の値を解析してローカルのタイムスタンプを得る
          timestamp = time.mktime(time.strptime(l, "Last-Modified: %a, %d %b %Y %H:%M:%S GMT")) - time.timezone
        except ValueError:
          print u"エラー: URL \"%s\" の最終更新日時の解析に失敗しました".encode("utf-8") % self.player_url
          sys.exit(1)
        try:
          os.utime(filename, (timestamp, timestamp))  # 場所, (アクセス, 更新)
        except OSError:
          print u"エラー: ファイル \"%s\" のタイムスタンプ操作に失敗しました".encode("utf-8") % filename
          sys.exit(1)
        opener.cleanup()

class GnashYouTube:
  """
  指定されたYouTubeのURLをGnashで再生する
  """
  def check_cmdline(self):
    """
    コマンドライン引数の簡易チェック
    """
    if len(sys.argv) != 2:
      print u"使用法: %s \"http://(jp.)youtube.com/watch?v=...\"".encode("utf-8") % __file__
      sys.exit(1)
    self.watch_url = sys.argv[1]
  def make_flashvars(self):
    """
    YouTubeのHTMLからGnashのFlashVarsを作成
    """
    ## HTMLファイルを取得して解析し、「swfArgs」の行を返す
    try:
      f = urllib.urlopen(self.watch_url, None)
    except IOError:
      print u"エラー: URL \"%s\" が開けませんでした".encode("utf-8") % self.watch_url
      sys.exit(1)
    ## Flashの引数を含む行を探す
    while True:
      line = f.readline()
      if not line:
        ## 引数の行が見つからずに終端に達した
        print u"エラー: Flashの引数が見つかりませんでした".encode("utf-8")
        sys.exit(1)
      if "var swfArgs = {" in line:
        break
    f.close()
    ## Pythonの辞書としてswfArgsを解析し代入(「null」は文法エラー対策でクォート)
    ## 「swfArgs = {"key1" : "value1", "key2" : "value2", ... }」 の代入を行う
    ## 文字列メソッドfind()は「swfArgs」より前の部分を除くのに使用
    exec "%s" % line.replace("null", "\"null\"")[line.find("swfArgs = {"):]
    ## 引数部分の文字列を組み立てる
    return "FlashVars=video_id=%s&l=%s&t=%s&sk=%s" % (swfArgs["video_id"],
                                                      swfArgs["l"],
                                                      swfArgs["t"],
                                                      swfArgs["sk"])
  def main(self):
    """
    処理の開始
    """
    self.check_cmdline()
    c = CachePlayer()
    if c.is_download_needed():
      c.do_download()
    exedic = { "cmd" : ("gnash", "-U", "http://jp.youtube.com", "-P", self.make_flashvars(), c.player_local),
               "workdir" : os.getcwd() }
    cmd = ExecuteForeground(**exedic)
    cmd.run()

if __name__ == "__main__":  # このスクリプトが直接「実行」されたとき
  app = GnashYouTube()
  app.main()

Flashのパラメータに関しては、JavaScriptに書かれているコードをexec文でPythonのコードとして解釈させて辞書に代入してしまうことで簡単に処理できたが、プレーヤのswfファイルをサーバから/var/tmp/以下にダウンロードするところで、サーバのほうが新しい場合にのみ更新するようにする部分が少し面倒だった。
過去のシェルスクリプト版では外部コマンドsedwgetを使用していたが、このスクリプトではGnashを除いて全てPythonのみで動いている。

関連記事:

使用したバージョン:

  • Gnash 0.8.2