Spark Streaming for Kakfa 資料流串接

Spark Streaming for Kafka 

大家應該都知道Spark 套件,不需要在做說明,今天介紹如何透過Spark Steaming 處理資料,使用Kafka 接收資料,將接收回來資料存入kafka 中,在透過Spark Streaming API取得資料,資料處理完後儲存至DB。

首先需要先建立好Kafka 及Zookeeper Service 平台,接下來就是要寫程式利用Spark Steaming套件取得Kafka Consumer資料。
 
    先將log資料傳送到Kafka Cluster Service,在利用Spark Streaming 將資料取出做ETL或計算,把需要儲存的資料存入DB or NoSQl。
    以下為架構圖:









GithubSource Code

今天的範例為Scala程式語言透過SparkSteaming將Kafka Cluster Topic內的資料收集回來的資料ETL後儲存Elasticsearch DB

建立一個專案為SBT ,將以下的repository 加入Sbt內

//Library
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % sparkVers ,
  "org.apache.spark" % "spark-streaming_2.10" % sparkVers   ,
  "org.apache.spark" % "spark-streaming-kafka_2.10" % sparkVers ,
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.3.3",
  "com.typesafe.play" % "play-json_2.10" % "2.4.5"
)


Kafka資料格式 id,name,age,address,使用Spark Streaming RDD折解字串,將每一個字串轉成Row回傳完成後,使用DataFrame Filter把需要的Data存入Elasticsearch DB


建立 SparkConf ,設定Elasticsearch IP,
注意:Elasticsearch DB Server 需要在同一個Lan底下,不能透過transport連接Elasticsearch,

class SparkConfig {
  def SparkConfToES = new SparkConf()
    .setAppName("spark streaming kafka")
    .setMaster("local")
    .set("spark.serializer",classOf[KryoSerializer].getName)
    .set("es.nodes", "192.168.1.33")
    .set("es.port", "9200")
}



新增一個Main Class ,建立Spark Streaming, 設定Zookeeper Server
val conf =  SparkConfToES
    val sc = new SparkContext(conf)
    val batchIntervl = 15
    val ssc = new StreamingContext(sc, Seconds(batchIntervl))
    val sqlContext = new SQLContext(sc)
    //kafka group
    val group_id = "receiveScanner"
    // kafka topic
    val topic = Map("testStreaming"-> 1)
    // zk connect
    val zkParams = Map(
      "zookeeper.connect" ->"localhost",
      "zookeeper.connection.timeout.ms" -> "10000",
      "group.id" -> group_id)



說明以下codo是在做什麼動作,透過Streaming 以每15秒讀取Kafka Consumer Data,使用play Json Parse將資料轉成Json格式,折解後將資料轉成RDD[Row],RDD[Row]在轉成DataFrame,使用Spark SQL 過濾 age 小於20以下,在排序age 並將資料寫入Elasticsearch Nosql DB


// Kafka
    val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
    val receiveData = kafkaConsumer.map(_._2 )
    // printer kafka data
    receiveData.print()
    receiveData.foreachRDD{ rdd=>
      val transform = rdd.map{ line =>
        val data = Json.parse(line)
        // play json parse
        val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
        val name = ( data \ "name"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
        val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
        val address = ( data \ "address"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
        Row(id,name,age,address)
      }
      val transfromrecive = sqlContext.createDataFrame(transform,schameType)
      import org.apache.spark.sql.functions._
      import org.elasticsearch.spark.sql._
      //filter age < 20,to ES database
      transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
        .saveToEs("member/user",Map("es.mapping.id" -> "id"))
    }



DataFrame Scahme

def schameType =  StructType(
     StructField("id",IntegerType,false)::
       StructField("name",StringType,false)::
       StructField("age",IntegerType,false)::
       StructField("address",StringType,false)::
    Nil  ) 
   }



References
https://spark.apache.org/docs/1.6.2/streaming-kafka-integration.html