RubyでHadoop Streaming

Hadoop Streamingは標準入出力さえ扱えれば,他の言語でもMapReduceが実現できるようにする仕組みです.
じゃあ,実際どうなの?ということで,Rubyを例にPi計算をしてみました.

事前準備

と言ってもHadoopがインストールされていれば,Rubyをインストールするだけ.

# 本当にこれだけ.
yum install ruby

Pi計算

HadoopのexampleのPi計算で使われているアルゴリズムモンテカルロ法のようなので,
同じ手法を使って計算します.
具体的には参考として載せているURLを参照すると非常に分かりやすいです.

Mapper

Rubyだとシンプルでよい.

result = Array.new(ENV['MAPPER_ARG'].to_i).map {
  x = rand
  y = rand
  l = (x ** 2) + (y ** 2)
  l < 1.0 ? true : nil
}
puts "#{result.nitems}\t#{result.size}"

実行時にMAPPER_ARG変数で打点数を渡します.
falseを使わずnilにすることで,nitemsメソッド一発でtrue分を数えられるようにしてみました.

Reducer

こっちはmapperの結果を受け取って数え上げています.

inside_and_total = STDIN.map { |line|
  line.split("\t").map { |str| str.to_i }
}.transpose

sum_inside = inside_and_total.first.inject(0) { |sum, i| sum + i }
sum_total = inside_and_total.last.inject(0) { |sum, i| sum + i }

puts 4.0 * sum_inside / sum_total

最後の行でPiを出力します.

Hadoopで動作させてみる

事前にinputとして指定する適当なデータを入れておきます.
(もしかしたら,こんなことしなくてもいい方法があるかもしれない...)

$ hadoop fs -mkdir /input
$ hadoop dfs -put text.txt /input

そしていよいよ実行.
map数を10として,10000000回の計算を10セット繰り返します.全部で1億回!

$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2+737.jar -D mapred.map.tasks=10 -cmdenv MAPPER_ARG=10000000 -input /input -output output -mapper path/to/mapper.rb -reducer path/to/reducer.rb
packageJobJar: [/var/lib/hadoop-0.20/cache/user/hadoop-unjar2634399002173509844/] [] /tmp/streamjob1531829330232410806.jar tmpDir=null
...
INFO streaming.StreamJob: Output: output

$ hadoop fs -cat /user/user/output/part-00000
3.14158328	

結構時間も掛かるんですが,まあまあな値が出たんじゃないでしょうか.


しかし,exampleにはとても勝てないのであった...

$ hadoop jar /usr/lib/hadoop/hadoop-0.20.2+737-examples.jar pi 10 10000000
Number of Maps  = 10
Samples per Map = 10000000
...
Estimated value of Pi is 3.14159256000000000000