Spark Streaming最初の一歩

はじめに

Spark, SQL on Hadoop etc. Advent Calendar 2014 15日目です。
3日目の記事でもSpark Streamingは紹介されていましたが、私のほうではより初心者向けの基本的な内容を記述しておきたいと思います。

  • Spark Streamingとは
  • リアルタイムログ検索エンジンを考えてみる
  • 実装してみる
Spark Streamingとは

Apache Sparkはインメモリで動作する柔軟なバッチエンジンです。
Spark Streamingはストリームで流れてくるデータに対してApache Sparkで書いたバッチが動作するものです。
誤解を恐れずにいうとSpark Streamingは主に以下の3つの機能を提供しています。

  • 各種ストリームデータを取得するためのコネクタ(Receiver)を提供する
  • ストリームデータをSparkのRDDのように扱えるDStreamという機構を提供する
  • 定期間隔でバッチを実行する機構を提供する(JobScheduler)

一点目はストリームデータを取得する仕組みです。3日目の記事のように自分で用意することも可能ですが、既にHDFS、Flume、Kafka、Twitterと接続するコネクタが用意されています。

二点目はストリームデータをSparkで利用するRDDと同じ扱いができるようにDStreamというデータ機構を提供しています。
これは実装上ではほとんどRDDとは別物なんですがRDDに対する操作とほぼ同じことができるようになっています。

最後は定期間隔でJobを実行するための仕組みです。
とりあえず概要はここまで。

リアルタイムログ検索エンジンを考えてみる

今回はSpark Streamingの動作イメージとして全文検索エンジンを考えてみます。
DBにあるものを検索するのではなく、ログにSQLを吐き出してそれをストリーミング処理してElasticSearchに突っ込みます。
イメージは以下の感じです。

f:id:POCHI_BLACK:20141212010130p:plain

今回のアプリは簡単にElixirで書いたものCRUD時にSQLログを出力しています。
Fluentdについては説明する必要はないかとおもいます、今回はfluent-kafkaプラグインを利用してKafkaに書き出しています。
それではSpark Streamingの実装を見ていきます。

実装してみる

最初に今回書いたコードをGithubにあげてますので気になる方はみてみてください。

pochi/spark-streaming-kafka-sample
書いたソースコードを早速見てみます。

package pochi.spark.streaming

import org.apache.spark._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.spark.sql._
import org.elasticsearch.spark.sql._

import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}

object Elastic {
  def main(args: Array[String]) {
    val numThreads = 1
    val zookeeperQuorum = "localhost:2181"
    val groupId = "test"

    val topic = Array("test").map((_, numThreads)).toMap
    val elasticResource = "apps/elixir_blog"

    val sc = new SparkConf()
                 .setMaster("local[*]")
                 .setAppName("Elastic Search Indexer App")

    val ssc = new StreamingContext(sc, Seconds(10))
    ssc.checkpoint("checkpoint")

    val logs = KafkaUtils.createStream(ssc,
                                        zookeeperQuorum,
                                        groupId,
                                        topic,
                                        StorageLevel.MEMORY_AND_DISK_SER)
                          .map(_._2)

    logs.foreachRDD { rdd =>
      val sc = rdd.context
      val sqlContext = new SQLContext(sc)
      val log = sqlContext.jsonRDD(rdd)
      log.saveToEs(elixirElasticResource)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

まず、scのとこまではSparkとかわりませんが、その後にStreamingContextを作っています。
第二引数には間隔を渡してあげます。

その後のval logs...ではKafkaからデータを取ってくるコネクタ部分を記載しています。
これでfluentdから送られてKafkaに保存されているデータが取得できます。

その後は各ログをElasticSearchに保存しています。
ここでSparkSQLを利用しているのはFluentdから送られてきたjsonをElasticSearchに入れるときに
json構造体と認識させるために利用しています。

実行するには以下のどちらかを実行します。

$ sbt run

or

$ sbt package
$ spark-submit --class Elastic target/scala-2.10/elastic-assembly.jar

実行すると10秒間隔でログをElasticSearchにいれていることが分かります。

最後に

非常に簡単にSpark Streamingを利用することができました。
とはいえこれを試したときは日本語記事もないのでソースコード読みながら進めていき結構時間がかかってしまいました。
Scalaを読むのもほとんど初めてだったのでpom.xmlの依存性解決のところとかとても苦戦しました><
この記事がSpark Streamingが利用される一助になればうれしいです。

AngularJSの素晴らしさを整理してみる

久々のブログになります。
3年ほどJavaScriptを利用した開発をしてきましたが、最初はなぜJavaScriptMVCフレームワークが必要なのかいまいちわからず、いろいろ試行錯誤してきました。
今日はタイトルの通りAngularJSが素敵だなということを書きたいのですが、よくあるベストプラクティスみたいなものではなく、自身がどのように思考回路を経由してここに行き着いたのかという経験談を記しておきたいと思います。

ちなみに私のJavaScript力は高くありませんのであしからず・・・。

jQuery

昔々、JavaScript MV(V)Cみたいなものがなかった時代、JavaScriptでリッチなものを作ろうとなるとjQueryやDojoのような低レイヤーのフレームワークが主に利用されていました。
その当時簡単なシングルページアプリケーションを作っていましたが私はjQueryを採用しました。
jQueryはユーザイベントを拾ったり、サーバとの通信をサポートしているので、私は以下の思考回路でアプリを開発してました。

  • イベントを拾う
  • $.ajaxを利用してサーバと通信する
  • addClassやappend(html)を利用してViewを変更する

思考回路は非常にシンプルなので開発はしやすかった記憶があります。しかし、今そのソースコードは保守性がかなり低い状態になってしまいました。その理由は以下の通りです。

  • 構造化されていないため、コードを追うのが難しい
    • コードを増やせば増やすほど見通しが悪くなる
  • テストを書く技術力がなく、簡単に変更ができない

開発当時はわかりましたが、今は理解するのが難しいです。
ということで動きはしましたが、とても納得のできるソースコードはかけませんでした。

Backbone.js

jQueryの開発の1年後くらいでしょうか。当時注目されていた(今もか)WebSocketを利用して本格的なチャットシステムを開発するチャンスをもらいました。
そのときはjQueryの経験があったこととBackbone.jsの登場まもなくということで運命を感じた私はすぐにBackbone.jsに飛びつきました。
最初はほとんどドキュメントがない状態で書き始めたので最初はとても苦労しました。
そのときは以下の思考回路で開発をしていました。

  • View,Collection,Modelの設計をする
  • イベントを拾う(Viewのbind)
  • Modelを利用してサーバと通信する
  • Modelの変更をViewに適用する

Backbone.jsを利用したおかげでソースコードを構造化できるようになりましたが、設計の手間が増えました。
ViewにCollectionを持たしてさらにその一つ一つがModelを持つというのもそれなりに手間がかかりました。

テストについては、Cucumber+Capybaraを利用してWebSocket含めたストーリーテストを書くことでアプリの
一貫性を保つことができました。
構造化とテストの一貫性を実現できて、保守性自体はjQueryより大幅にあがったと思います。
(これは特にBackboneに限った話ではなくjQueryでもCucumber+Capybaraでテスト書くことは可能でした。)

ただ、それでもBackbone.jsを利用した開発手法は何か納得できないものがありました。
それは以下のような点です。

  • JavaScript単体テストが書きづらかった
    • ModelとViewの依存性が高いと感じた
  • 実際のHTMLとView側のbindが直感的ではなかった
    • ViewでjQueryセレクタのような形で記述するが具体的にどこを示しているのか直感的にわからなかった
  • Modelの変更をViewに伝えるのにModelの中でViewを呼び出していた
    • 当時でも他にいい方法があったのかもしれませんが、その当時は少なくともlistenToのような仕組みはなかったです。

ということでもう少し経験を積めば慣れていたかもしれませんが、満足はできないなーという想いでした。

AngularJS

そして今年に入ってAngularJSを利用し始めました。
RedmineGoogleカレンダーライクなプラグインを作っています。
Angualarの概要は一部ドキュメントの翻訳もしましたのでよければ見てみてください。
私は最初Angularを書いていて正直ちんぷんかんぷんでした。
抽象化レベルが高く、自由度が高すぎたのが原因でした。しかしすこしずつ書き直していくと以下の手順でかけるようになりました。

  • HTMLにAngularでbindすべきイベントを記述する
  • AngularのControllerでイベントを受け取る
  • VIewの変更ないしサーバとの通信をModelのビジネスロジックに記述する
  • Viewを変更する

この手順はとてもきれいに書けて開発がテンポよく行えるようになりました。
ここまできてやっと気づけたのですがこのAngularの手順はRailsで書くアプリと似ています。
Railsの場合、私は以下のような手順で開発しています。

  • HTMLを書く
  • formのsubmitやリンク等のイベントをサーバ側でControllerが受け取る
  • Controllerは必要な処理をModelのビジネスロジックに依頼する
  • Viewを返す

Angularで書くと以下のように記述できます。

  • HTMLを書く
  • ng-clickやng-mouseover等のイベントをAngularのController(scope)が受け取る
  • Controllerは必要な処理をModelのビジネスロジックに依頼する
  • Viewを返す

例えばカレンダーにイベントを作るとき、Controllerは以下のように書いています。

scope.createEvent = function() {
   // エラーの時の振る舞いを書く
   var error = function(response) {
     console.log(response);
     scope.showNotification("ライセンス数の上限に引っかかっています");
    };

   // 保存に成功したときの振る舞いを書く
   var success = function(event) {
     scope.myCalendar.fullCalendar("renderEvent", event,  true);
     scope.last_update_event = event;
   };

   // モデルに保存を依頼
   scope.formEvent.create(success, error);    
   scope.closeEventForm();
};

まさにRailsのControllerみたいな書き方ができてます。
他にもViewとモデルの双方向バインディングとかテストのしやすさとかdirectiveとかまだまだ書きたいことはありますが、
今回は力つきたのでここまでです。
とにかくjQuery,Backbone,Angularを触ってみましたが今のところAngularに夢中になる予感なのです。

DBのインデックスメモ

それなりに件数の多いデータをRDBで扱うと問い合わせするカラムにはindexをつけると性能が大幅に向上します。

そんなときバッチなどである特定のカラムにwhere句を設定するとき複合インデックスをつけると性能がさらによくなります。
付け方は以下のようなコマンドです。

CREATE INDEX table_name ON index_name (column1,column2...)

このときにカーディナリティの高いカラムから設定すると効率がさらによくなります。

また、RDBを利用するとDISKIOがネックになるケースも多いですがその辺りをLINUXで見るコマンドも備忘録として残しておきます。

iostat -x 10

これで出てきたデバイスごとの一番右の項目がDISK使用率になります。ここが高いとRDBのクエリやらを改善しないといけなくなってきます。

ということで最近LINUX環境でリソースみるときはCPU,メモリ,DISKIO(WEBアプリの場合ネットワークのIOも)を見るようにしてますがdstatで一括して見るには次のコマンドが便利です。

dstat -Tclm --disk-util

OS全体のリソースを把握しつつクエリが遅ければEXPLAINとINDEXを使って高速化していくことが重要だと分かりつつ実践しないとできないなーとも思いました。

SilentCloseに対する対応について

概要

WebSocketを利用した場合、長時間接続していると知らない内に接続がきれていることがあります。
詳細はこのあたりが詳しいです。
僕の環境でもSilentCloseに多分なったので事象と対応をまとめておこうと思います。

SilentCloseがおこるとどうなるか

SilentCloseが発生した場合、上記のリンクにもありますが以下のようなことが発生します。

  • JavaScriptのonerror, oncloseは発生しない
  • サーバ側でのonerror, oncloseも発生しない
  • サーバ側でnetstatすると接続はESTABLISHEDになっている
SilentCloseの問題点

これが発生すると実際にメッセージの送受信ができないこと以外はすべて正常だと認識されます。
僕の環境だとサーバからメッセージを送ってもなにもエラーが発生しませんでした。
なので実際にメッセージが送れなくなってからメッセージはおくれない、システムは気づけないという大きな問題が発生してしまいます。

どう解決するか

上記のリンクを参考にすると以下のような解決策があるようです。

  • 一定時間で接続をリセットする
  • 定期的にPing-Pongリクエストのやり取りを行う

接続をリセットする場合ある一定時間で接続がきれてから再接続するまでメッセージの送受信ができないのが難点です。どのNATやFireWallがすべて同一の時間できってくれるならいいですが一概にもそうはいえないのかと思います。(確証はないですが。。。)

ということで僕の環境では定期的にPing-Pongで実装するようにしました。

ただ、小松さんの発言の通りkeep-aliveで解消されてJavaScriptレベルで意識しなくてよくなるのが一番ですね。

どう実装したか

まずは、Client側でPing-Pong管理の配列を用意します。

var heartbeats = [];

次にOnOpenの際に定期的にメッセージ送受信するsetIntervalを定義します。

var socket = new WebSocket(url);
socket.onopen = function() {
  setInterval(function() {
    if (heartbeats.length() >= 2) {
      reconnect();                  // 前回のPingリクエストがかえってきていない再接続処理を実装する
    }
    current_time = new Date().toString();
    heartbeats.push(current_time); // Queueに追加
    socket.send(current_time);        // Pingリクエスト
  }, 10000)
  // 実際にやりたいOnOpen処理
};

最後にOnMessageでPingのレスポンスを処理する。

socket.onmessage = function(event) {
  var parse_data = event.data;
  ping_data = __indexOf.call(self.heartbeats, parse_data) >= 0; // Pingレスポンスか判断
  if (ping_data) {
    // Queueから取り除く
    return self.heartbeats = self.heartbeats.filter(function(e) {
    return e !== parse_data;
    });
   } else {
     // Pingレスポンスでなければ本来の処理へ
     return self.onmessage(event);
   }
};

WebSocket備忘録

社内でWebSocketに関する勉強会をやってきました。今回WebSocket自身を調べ直したことでだいぶ自分の勉強になりました。
普段アプリケーションを書く上であまり気にならないですがプロトコルの中身を知っておくことは重要であまりその辺りを書いている人もいなかったので書いておこうと思います。
詳しくは RFC6455日本語訳に書いてます。

WebSocketとは

ステートフルなプロトコルで、一度接続を行うとその後はサーバからでもクライアントからでも好きなタイミングでデータのやり取りを行うことができる。Googleさん提唱でだいたいどのブラウザからでも利用可能。

WebSocket接続手順

接続確立については最初はHTTPを利用する。
これはファイアウォールやプロキシなど既存のHTTP環境との親和性を持つためにやっている。
またHTTPと同様にTLSを利用した仕組みでセキュアな接続も可能である。

RFC6455の例を引用すると最初のクライアントからのリクエストヘッダは以下のようなものが送られる。(注釈は自分で勝手に追加したもの)

GET /chat HTTP/1.1
# HTTPのバージョン、パス、メソッド
Host: server.example.com
# ホスト名
Upgrade: websocket
# 下記のConnectionヘッダにあるUpgrade要求でwebsocketにUpgradeしてねといっている
Connection: Upgrade
# 接続のWebSocket切り替えを依頼
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
# クライアントはこのキーをランダムに生成する。サーバからのSec-WebSocket-Acceptから複合化して正しい自分へのレスポンスであることを確認する
Origin: http://example.com
# Originヘッダはブラウザから自動で追加される。サーバ側はWebSocket接続していいものかどうか判断するときに利用する
Sec-WebSocket-Protocol: chat, superchat
# WebSocket上でカスタマイズされたプロトコルを利用する場合クライアントは要求を送る。ただしサーバ上でこのカスタマイズされたプロトコルが利用できなければいけない
Sec-WebSocket-Version: 13
# WebSocketのバージョン。RFC6455の場合は13となる

このようなリクエストがくるとサーバ側で解釈して正しかった場合は以下のようなレスポンスを返す。ただし認証が必要な場合401を返すこともできる。

HTTP/1.1 101 Switching Protocols
# プロトコル変更ができましたというレスポンスコード
Upgrade: websocket
Connection: Upgrade
# WebSocketにUpgradeできたことが返る
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
# サーバ側でSec-WebSocket-Keyから暗号化した文字列を返す。クライアント側でこれを複合化して検証
Sec-WebSocket-Protocol: chat
# クライアントから送られてきたカスタムプロトコルに対応していたものを返す。これで接続後は'chat'というプロトコルでやり取りが行われる。

Sec-WebSocket-Key及びSec-WebSocket-Acceptに用いられるアルゴリズムは以下のとおり。

  • Sec-WebSocket-Keyに特定文字列を追加
  • SHA-1を利用してハッシュ値を取得
  • それをBase64符号化するとSec-WebSocket-Acceptになる。

これで接続完了となる。

接続が切れてしまった時の対応

もしクライアントもサーバも望んでいないタイミングで接続が切れてしまった場合、RFCを見る限り、クライアントからサーバに再度接続をすることが望ましいとされている。また何度も接続が走るとサーバ側への負荷がかかるかもしれないので少しずつ間隔をのばしつつ接続するとのこと。
Chroniumのソースコードをちょっと見たところその部分の実装がみつからなかったので後日試してみよう。

データのやり取り

一度接続確立されるとフレームという単位でデータのやり取りを行う。
データのやり取りにはHTTP Headerがつかないためネットワークの転送量が削減される。またWebSocketはデータの中身をテキストかバイナリどちらかで送ることができる。バイナリを利用した場合テキストに比べておおよそ30%転送量が削減される。
フレームの種類は4つのタイプがある。

  • Closeフレーム
  • Pingフレーム
  • Pongフレーム
  • Dataフレーム
Closeフレーム

接続終了を行うフレーム。
クライアント及びサーバどちらからでも送ることができる。ただしサーバから送信することが望ましいとされている。理由はサーバからCloseフレームを送った場合TCP接続がTIME_WAIT状態の時にその接続が再利用されないからである。

Pingフレーム

クライアントとサーバの間で接続維持できているか確認する。
一般的Pingは一度送るとすぐに結果がかえってくるがWebSocketでは早急に返すのは必須ではないとされている。(望ましいとはかいてある)HeartBeatの代わりになり得るものだと理解している。

Pongフレーム

PongフレームはPingフレームの応答用フレーム。
ただしPongフレームはPingフレームがこないと送れないものではない。
すなわち一方方向のリクエスト送信に利用することができる。

Dataフレーム

Dataフレームはアプリケーション間でのデータのやり取りに利用する。
テキストないしバイナリでの送信が可能。

ベースフレーミング

RFCを見ると以下のようなフレームになっている。

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+
  • FIN: このフレームがデータの終わりなのかどうかを示す
  • MASK: データがマスク化されているかどうかを示す(クライアントからサーバに送られる場合必ずマスク化される)
  • Payload len: アプリケーションデータの長さを示す
  • Masking-key: マスク化している場合キーデータを置く
  • Payload Data: アプリケーションデータ

クライアントからサーバへのデータのマスキングについて

クライアントからサーバへデータを転送する際必ずデータのマスキングが行われる。手法はシンプルでデータとマスキングキーのXORで表される。フレームにマスキングキーが入っていることからわかるようにこのフレームを盗まれるとデータは簡単に復元されてしまう。
これは元々このマスク化はデータの中身をセキュアにするわけではなくプロキシ汚染による攻撃を防ぐためのマスク化だからである。なのでデータをセキュアにする場合違う方法をとるべきである。

TCPパケットとの違い

http://msdn.microsoft.com/ja-jp/magazine/jj863133.aspx
に書いてます。とてもわかりやすくていい記事だと思います。