Spark Streaming最初の一歩
はじめに
Spark, SQL on Hadoop etc. Advent Calendar 2014 15日目です。
3日目の記事でもSpark Streamingは紹介されていましたが、私のほうではより初心者向けの基本的な内容を記述しておきたいと思います。
- Spark Streamingとは
- リアルタイムログ検索エンジンを考えてみる
- 実装してみる
Spark Streamingとは
Apache Sparkはインメモリで動作する柔軟なバッチエンジンです。
Spark Streamingはストリームで流れてくるデータに対してApache Sparkで書いたバッチが動作するものです。
誤解を恐れずにいうとSpark Streamingは主に以下の3つの機能を提供しています。
- 各種ストリームデータを取得するためのコネクタ(Receiver)を提供する
- ストリームデータをSparkのRDDのように扱えるDStreamという機構を提供する
- 定期間隔でバッチを実行する機構を提供する(JobScheduler)
一点目はストリームデータを取得する仕組みです。3日目の記事のように自分で用意することも可能ですが、既にHDFS、Flume、Kafka、Twitterと接続するコネクタが用意されています。
二点目はストリームデータをSparkで利用するRDDと同じ扱いができるようにDStreamというデータ機構を提供しています。
これは実装上ではほとんどRDDとは別物なんですがRDDに対する操作とほぼ同じことができるようになっています。
最後は定期間隔でJobを実行するための仕組みです。
とりあえず概要はここまで。
リアルタイムログ検索エンジンを考えてみる
今回はSpark Streamingの動作イメージとして全文検索エンジンを考えてみます。
DBにあるものを検索するのではなく、ログにSQLを吐き出してそれをストリーミング処理してElasticSearchに突っ込みます。
イメージは以下の感じです。
今回のアプリは簡単にElixirで書いたものCRUD時にSQLログを出力しています。
Fluentdについては説明する必要はないかとおもいます、今回はfluent-kafkaプラグインを利用してKafkaに書き出しています。
それではSpark Streamingの実装を見ていきます。
実装してみる
最初に今回書いたコードをGithubにあげてますので気になる方はみてみてください。
pochi/spark-streaming-kafka-sample
書いたソースコードを早速見てみます。
package pochi.spark.streaming import org.apache.spark._ import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.elasticsearch.spark.sql._ import org.apache.spark.storage.StorageLevel import org.apache.spark.Logging import org.apache.log4j.{Level, Logger} object Elastic { def main(args: Array[String]) { val numThreads = 1 val zookeeperQuorum = "localhost:2181" val groupId = "test" val topic = Array("test").map((_, numThreads)).toMap val elasticResource = "apps/elixir_blog" val sc = new SparkConf() .setMaster("local[*]") .setAppName("Elastic Search Indexer App") val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint("checkpoint") val logs = KafkaUtils.createStream(ssc, zookeeperQuorum, groupId, topic, StorageLevel.MEMORY_AND_DISK_SER) .map(_._2) logs.foreachRDD { rdd => val sc = rdd.context val sqlContext = new SQLContext(sc) val log = sqlContext.jsonRDD(rdd) log.saveToEs(elixirElasticResource) } ssc.start() ssc.awaitTermination() } }
まず、scのとこまではSparkとかわりませんが、その後にStreamingContextを作っています。
第二引数には間隔を渡してあげます。
その後のval logs...ではKafkaからデータを取ってくるコネクタ部分を記載しています。
これでfluentdから送られてKafkaに保存されているデータが取得できます。
その後は各ログをElasticSearchに保存しています。
ここでSparkSQLを利用しているのはFluentdから送られてきたjsonをElasticSearchに入れるときに
json構造体と認識させるために利用しています。
実行するには以下のどちらかを実行します。
$ sbt run
or
$ sbt package $ spark-submit --class Elastic target/scala-2.10/elastic-assembly.jar
実行すると10秒間隔でログをElasticSearchにいれていることが分かります。