このサイトについて

翻訳 - Pythonのmultiprocessingモジュールを使ってGoogleの並列処理システムMapReduceをインプリする

翻訳 - Pythonのmultiprocessingモジュールを使ってGoogleの並列処理システムMapReduceをインプリする

PyMOTWをメンテしてるDoug HallmannさんがPython 2.6(および3.0)から導入された,分散コンピューティングを強力にサポートするmultiprocessingモジュールを使ってMapReduceを実装しているのを発見した。MapReduceはGoolgeが使っている並列処理システム。multiprocessingモジュールのサンプルとしても,Pythonを使ったMapReduceの実装としても面白いし,勉強になると思うのでエントリを翻訳してみた。Pythonを覚えることの利点はいくつかあるけど,こういう上質なリソースにリーチする権利を得られる,のも大きな利点の一つだと思う。Pythonには,multiprocessingを初めとするハイクオリティで使い勝手の良いライブラリが附属してくるし,コードは迷いが無くて読みやすい。

とにかくエントリの翻訳をどうぞ。サンプルコードのコメントも翻訳してみたです↓。


今週のPyMOTWに使うためにmiltiprocessingモジュールのサンプルコードを探していたとき,「MapReduceを実装すればいいんじゃね?」と言われたんだ。ということで作ってみた。
PoolクラスはシンプルなシングルサーバMapReduceを実装するのに使える。分散処理の恩恵は受けられないけど,なにかの処理を分散化できるように分割するのがいかに簡単か,ということを示すのには十分だ。


MapReduceでは,入力データは複数のワーカーインスタンスによって複数の小片(チャンク)に分割される。チャンクに分割された入力データは,シンプルな変換手法でもって中間形式にMapされる。中間データは,そのあとキーに従ってまとめられたり分割されたりして,関連する値がまとめられる。最後に,分割されたデータは結果としてReduceされる。これがMapReduceの動くしくみだ。

# coding=utf-8

import collections
import multiprocessing

class SimpleMapReduce(object):
   
    def __init__(self, map_func, reduce_func, num_workers=None):
        """
        map_func

          入力を中間データにMapする呼び出し可能オブジェクト。入力は1つの引数に渡し,
          キーと値のタプルを返す。このタプルがReduceされる。
       
        reduce_func
          分割された中間データをReduceして最終データにする呼び出し可能オブジェクト。
          map_funcが生成したキーを引数としてとる。
        
        num_workers
          ワーカープールに作るワーカーの数。
          デフォルト値は,カレントホストで利用できるCPUの数となる。
        """
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)
   
    def partition(self, mapped_values):
        """Mapされた値をキーでまとめる。キーに値のシーケンスをつなげた辞書を返す。
        """
        partitioned_data = collections.defaultdict(list)
        for sublist in mapped_values:
            for key, value in sublist:
                partitioned_data[key].append(value)
        return partitioned_data
   
    def __call__(self, inputs, chunksize=1):
        """初期化時に与えられた呼び出し可能オブジェクトを使って,入力をMapReduceにかける。
       
        inputs
          処理する入力データのイテレート可能オブジェクト
       
        chunksize=1
          ワーカーに渡す入力データのためのオプション。Mapフェーズのパフォーマンス
          チューニングに利用する。
        """
        mapped_values = self.pool.map(self.map_func, inputs, chunksize=chunksize)
        partitioned_data = self.partition(mapped_values)
        reduced_values = self.pool.map(self.reduce_func, partitioned_data.items())
        return reduced_values

 

ファイルの英単語の数を数える

次のサンプルプログラムは,SimpleMapReduceを使って,マークアップを除外しながらreStructuredTextのソースにある英単語の数を数えるもの。

# coding=utf-8

import multiprocessing
import string

from multiprocessing_mapreduce import SimpleMapReduce

def file_to_words(filename):
    """ファイルを読み込み,(単語,出現回数)を返す
    """
    STOP_WORDS = set([
        'a', 'an', 'and', 'are', 'as', 'be', 'for', 'if', 'in',
        'is', 'it', 'of', 'or', 'py', 'rst', 'the', 'to', 'with',
        ])
    TR = string.maketrans(string.punctuation, ' ' * len(string.punctuation))

    print multiprocessing.current_process().name, 'reading', filename
    output = []

    with open(filename, 'rt') as f:
        for line in f:
            if line.lstrip().startswith('..'): # Skip rst comment lines
                continue
            line = line.translate(TR) # Strip punctuation
            for word in line.split():
                word = word.lower()
                if word.isalpha() and word not in STOP_WORDS:
                    output.append( (word, 1) )
    return output


def count_words(item):
    """分割されたデータを単語と出現数のタプルに変換する
    """
    word, occurances = item
    return (word, sum(occurances))


if __name__ == '__main__':
    import operator
    import glob

    input_files = glob.glob('*.rst')
   
    mapper = SimpleMapReduce(file_to_words, count_words)
    word_counts = mapper(input_files)
    word_counts.sort(key=operator.itemgetter(1))
    word_counts.reverse()
   
    print '\nTOP 20 WORDS BY FREQUENCY\n'
    top20 = word_counts[:20]
    longest = max(len(word) for word, count in top20)
    for word, count in top20:
        print '%-*s: %5s' % (longest+1, word, count)

file_to_wordsが,入力として渡したファイル名は,(単語, 1)というシーケンスのペアに変換する。データはSimpleMapReduce.partitions()メソッドが英単語をキーとして分割する。分割されたデータは英単語と単語の出現数の数値が返る。Reduceのフェーズでは,cound_wordsを呼ぶことでデータを(英単語, カウント)の値に変換する。
以下が実行のサンプル。

$ python multiprocessing_wordcount.py
PoolWorker-1 reading communication.rst
PoolWorker-1 reading index.rst
PoolWorker-1 reading mapreduce.rst
PoolWorker-2 reading basics.rst

TOP 20 WORDS BY FREQUENCY

process         :    74
multiprocessing :    42
worker          :    35
after           :    31
running         :    30
start           :    29
python          :    28
processes       :    27
literal         :    26
header          :    26
pymotw          :    26
end             :    26
daemon          :    23
now             :    22
consumer        :    20
starting        :    18
exiting         :    17
event           :    16
that            :    16
by              :    15

 


2010-08-27 04:51