Scala on Hadoop: Hadoop Conference

先日、Hadoop ConferenceScala on Hadoopというタイトルで発表してきました。スライドを以下に置いておきます。

ダイジェストとして、ScalaをHadoopで動かすための方法を書いておきます。

まず、Hadoop上でScalaを実行させるためには、JavaScalaを接続するライブラリが必要となります。ここでは、SHadoop( http://code.google.com/p/jweslley/source/browse/#svn/trunk/scala/shadoop )を使用します。SHadoopは、型変換を行うシンプルなライブラリです。

よくあるWordCountのサンプル、WordCount.scala (http://blog.jonhnnyweslley.net/2008/05/shadoop.html より)は以下の通りです。

package shadoop

import SHadoop._
import java.util.Iterator
import org.apache.hadoop.fs._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred._

object WordCount {

  class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {

    val one = 1

    def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter) = 
      (value split " ") foreach (output collect (_, one))
  }

  class Reduce extends MapReduceBase with Reducer[Text, IntWritable, Text, IntWritable] {
    def reduce(key: Text, values: Iterator[IntWritable], 
        output: OutputCollector[Text, IntWritable], reporter: Reporter) = {
      val sum = values reduceLeft ((a: Int, b: Int) => a + b)
      output collect (key, sum)
    }
  }

  def main(args: Array[String]) = {
    val conf = new JobConf(classOf[Map])
    conf setJobName "wordCount"

    conf setOutputKeyClass classOf[Text]
    conf setOutputValueClass classOf[IntWritable]

    conf setMapperClass classOf[Map]
    conf setCombinerClass classOf[Reduce]
    conf setReducerClass classOf[Reduce]

    conf setInputFormat classOf[TextInputFormat]
    conf setOutputFormat classOf[TextOutputFormat[_ <: WritableComparable, _ <: Writable]]

    conf setInputPath(args(0))
    conf setOutputPath(args(1))

    JobClient runJob conf
  }

}

これをコンパイルして、jarに固めます。

scalac -classpath $HADOOP_HOME/hadoop-*-core.jar:/usr/share/commons-cli-1/lib/commons-cli.jar -d classes WordCount.scala
jar -cvf wordcount.jar -C classes .

jarができたら、ローカルにて実行してみます。事前にscala-library.jarを$HADOOP_HOME/libにコピーするなどしてクラスパスを通す必要があります。また、/var/tmp/inputに入力ファイルを配置しておいてください。

 $HADOOPDIR/bin/hadoop jar ./wordcount.jar shadoop.WordCount -D mapred.job.tracker=local -D fs.default.name=file:///var/tmp input output

うまく実行されば、outputフォルダに結果のファイルが出力されているはずです。

クラスタで実行

クラスタで実行させるには以下のようにします。(もちろん事前にクラスタのセットアップは完了させておいてください。また、scala-library.jarなどのjarファイルを適切に配置してクラスパスを通しておく必要もあります。)

$HADOOP_HOME/bin/hadoop jar \
  ./wordcount.jar \
  shadoop.WordCount \
  input output

Mavenで扱う

Maven( http://maven.apache.org/ )は、Java系のプロジェクト管理ツールです。

まず、Scala向けのプロジェクト作成をします。動的に色々パッケージをダウンロード・インストールしますので、けっこう時間がかかります。

mvn \
 org.apache.maven.plugins:maven-archetype-plugin:2.0-alpha-4:create \
  -DarchetypeGroupId=org.scala-tools.archetypes \ 
  -DarchetypeArtifactId=scala-archetype-simple \
  -DarchetypeVersion=1.2 \
  -DremoteRepositories=http://scala-tools.org/repo-releases \
  -DgroupId=com.hatena.hadoop \
  -DartifactId=hadoop 

次に、Hadoop関連jarの登録を登録します。

mvn install:install-file \
  -DgroupId=org.apache.hadoop \
  -DartifactId=hadoop-core \
  -Dversion=0.20.1 \
  -Dpackaging=jar \
  -Dfile=/opt/hadoop/hadoop-0.20.1-core.jar

最後に依存関係をpom.xmlに記述します。

<dependency>
  <groupId>commons-logging</groupId>
  <artifactId>commons-logging</artifactId>
  <version>1.0.4</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>commons-cli</groupId>
  <artifactId>commons-cli</artifactId>
  <version>1.0</version>
  <scope>provided</scope>
</dependency>

その後、以下のコマンドでコンパイル・パッケージ化・消去ができます。

mvn scala:compile
mvn package
mvn clean

これで、Scalaで自由自在にHadoopのアプリケーションが書けるようになりました!色々試して遊んでみましょう。