Spark Streaming for Kafka
大家應該都知道Spark 套件,不需要在做說明,今天介紹如何透過Spark Steaming 處理資料,使用Kafka 接收資料,將接收回來資料存入kafka 中,在透過Spark Streaming API取得資料,資料處理完後儲存至DB。
首先需要先建立好Kafka 及Zookeeper Service 平台,接下來就是要寫程式利用Spark Steaming套件取得Kafka Consumer資料。
先將log資料傳送到Kafka Cluster Service,在利用Spark Streaming 將資料取出做ETL或計算,把需要儲存的資料存入DB or NoSQl。
以下為架構圖:
GithubSource Code
今天的範例為Scala程式語言透過SparkSteaming將Kafka Cluster Topic內的資料收集回來的資料ETL後儲存Elasticsearch DB
建立一個專案為SBT ,將以下的repository 加入Sbt內
//Library libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.10" % sparkVers , "org.apache.spark" % "spark-streaming_2.10" % sparkVers , "org.apache.spark" % "spark-streaming-kafka_2.10" % sparkVers , "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.3.3", "com.typesafe.play" % "play-json_2.10" % "2.4.5" )
Kafka資料格式 id,name,age,address,使用Spark Streaming RDD折解字串,將每一個字串轉成Row回傳完成後,使用DataFrame Filter把需要的Data存入Elasticsearch DB
建立 SparkConf ,設定Elasticsearch IP,
注意:Elasticsearch DB Server 需要在同一個Lan底下,不能透過transport連接Elasticsearch,
class SparkConfig { def SparkConfToES = new SparkConf() .setAppName("spark streaming kafka") .setMaster("local") .set("spark.serializer",classOf[KryoSerializer].getName) .set("es.nodes", "192.168.1.33") .set("es.port", "9200") }
新增一個Main Class ,建立Spark Streaming, 設定Zookeeper Server
val conf = SparkConfToES val sc = new SparkContext(conf) val batchIntervl = 15 val ssc = new StreamingContext(sc, Seconds(batchIntervl)) val sqlContext = new SQLContext(sc) //kafka group val group_id = "receiveScanner" // kafka topic val topic = Map("testStreaming"-> 1) // zk connect val zkParams = Map( "zookeeper.connect" ->"localhost", "zookeeper.connection.timeout.ms" -> "10000", "group.id" -> group_id)
說明以下codo是在做什麼動作,透過Streaming 以每15秒讀取Kafka Consumer Data,使用play Json Parse將資料轉成Json格式,折解後將資料轉成RDD[Row],RDD[Row]在轉成DataFrame,使用Spark SQL 過濾 age 小於20以下,在排序age 並將資料寫入Elasticsearch Nosql DB
// Kafka val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER) val receiveData = kafkaConsumer.map(_._2 ) // printer kafka data receiveData.print() receiveData.foreachRDD{ rdd=> val transform = rdd.map{ line => val data = Json.parse(line) // play json parse val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0} val name = ( data \ "name" ).asOpt[String] match { case Some(x)=> x ; case None => "" } val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0} val address = ( data \ "address" ).asOpt[String] match { case Some(x)=> x ; case None => "" } Row(id,name,age,address) } val transfromrecive = sqlContext.createDataFrame(transform,schameType) import org.apache.spark.sql.functions._ import org.elasticsearch.spark.sql._ //filter age < 20,to ES database transfromrecive.where(col("age").<(20)).orderBy(col("age").asc) .saveToEs("member/user",Map("es.mapping.id" -> "id")) }
DataFrame Scahme
def schameType = StructType( StructField("id",IntegerType,false):: StructField("name",StringType,false):: StructField("age",IntegerType,false):: StructField("address",StringType,false):: Nil ) }
References
https://spark.apache.org/docs/1.6.2/streaming-kafka-integration.html