PyMOTWをメンテしてるDoug HallmannさんがPython 2.6(および3.0)から導入された,分散コンピューティングを強力にサポートするmultiprocessingモジュールを使ってMapReduceを実装しているのを発見した。MapReduceはGoolgeが使っている並列処理システム。multiprocessingモジュールのサンプルとしても,Pythonを使ったMapReduceの実装としても面白いし,勉強になると思うのでエントリを翻訳してみた。Pythonを覚えることの利点はいくつかあるけど,こういう上質なリソースにリーチする権利を得られる,のも大きな利点の一つだと思う。Pythonには,multiprocessingを初めとするハイクオリティで使い勝手の良いライブラリが附属してくるし,コードは迷いが無くて読みやすい。
とにかくエントリの翻訳をどうぞ。サンプルコードのコメントも翻訳してみたです↓。
今週のPyMOTWに使うためにmiltiprocessingモジュールのサンプルコードを探していたとき,「MapReduceを実装すればいいんじゃね?」と言われたんだ。ということで作ってみた。
PoolクラスはシンプルなシングルサーバMapReduceを実装するのに使える。分散処理の恩恵は受けられないけど,なにかの処理を分散化できるように分割するのがいかに簡単か,ということを示すのには十分だ。
MapReduceでは,入力データは複数のワーカーインスタンスによって複数の小片(チャンク)に分割される。チャンクに分割された入力データは,シンプルな変換手法でもって中間形式にMapされる。中間データは,そのあとキーに従ってまとめられたり分割されたりして,関連する値がまとめられる。最後に,分割されたデータは結果としてReduceされる。これがMapReduceの動くしくみだ。
import collections
import multiprocessing
class SimpleMapReduce(object):
def __init__(self, map_func, reduce_func, num_workers=None):
"""
map_func
入力を中間データにMapする呼び出し可能オブジェクト。入力は1つの引数に渡し,
キーと値のタプルを返す。このタプルがReduceされる。
reduce_func
分割された中間データをReduceして最終データにする呼び出し可能オブジェクト。
map_funcが生成したキーを引数としてとる。
num_workers
ワーカープールに作るワーカーの数。
デフォルト値は,カレントホストで利用できるCPUの数となる。
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self, mapped_values):
"""Mapされた値をキーでまとめる。キーに値のシーケンスをつなげた辞書を返す。
"""
partitioned_data = collections.defaultdict(list)
for sublist in mapped_values:
for key, value in sublist:
partitioned_data[key].append(value)
return partitioned_data
def __call__(self, inputs, chunksize=1):
"""初期化時に与えられた呼び出し可能オブジェクトを使って,入力をMapReduceにかける。
inputs
処理する入力データのイテレート可能オブジェクト
chunksize=1
ワーカーに渡す入力データのためのオプション。Mapフェーズのパフォーマンス
チューニングに利用する。
"""
mapped_values = self.pool.map(self.map_func, inputs, chunksize=chunksize)
partitioned_data = self.partition(mapped_values)
reduced_values = self.pool.map(self.reduce_func, partitioned_data.items())
return reduced_values
ファイルの英単語の数を数える
次のサンプルプログラムは,SimpleMapReduceを使って,マークアップを除外しながらreStructuredTextのソースにある英単語の数を数えるもの。
import multiprocessing
import string
from multiprocessing_mapreduce import SimpleMapReduce
def file_to_words(filename):
"""ファイルを読み込み,(単語,出現回数)を返す
"""
STOP_WORDS = set([
'a', 'an', 'and', 'are', 'as', 'be', 'for', 'if', 'in',
'is', 'it', 'of', 'or', 'py', 'rst', 'the', 'to', 'with',
])
TR = string.maketrans(string.punctuation, ' ' * len(string.punctuation))
print multiprocessing.current_process().name, 'reading', filename
output = []
with open(filename, 'rt') as f:
for line in f:
if line.lstrip().startswith('..'): # Skip rst comment lines
continue
line = line.translate(TR) # Strip punctuation
for word in line.split():
word = word.lower()
if word.isalpha() and word not in STOP_WORDS:
output.append( (word, 1) )
return output
def count_words(item):
"""分割されたデータを単語と出現数のタプルに変換する
"""
word, occurances = item
return (word, sum(occurances))
if __name__ == '__main__':
import operator
import glob
input_files = glob.glob('*.rst')
mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
print '\nTOP 20 WORDS BY FREQUENCY\n'
top20 = word_counts[:20]
longest = max(len(word) for word, count in top20)
for word, count in top20:
print '%-*s: %5s' % (longest+1, word, count)
以下が実行のサンプル。
PoolWorker-1 reading communication.rst
PoolWorker-1 reading index.rst
PoolWorker-1 reading mapreduce.rst
PoolWorker-2 reading basics.rst
TOP 20 WORDS BY FREQUENCY
process : 74
multiprocessing : 42
worker : 35
after : 31
running : 30
start : 29
python : 28
processes : 27
literal : 26
header : 26
pymotw : 26
end : 26
daemon : 23
now : 22
consumer : 20
starting : 18
exiting : 17
event : 16
that : 16
by : 15