Apache Spark 是一个高性能集群计算框架,其中 Spark Streaming 作为实时批处理组件,因为其简单易上手的特性深受喜爱。在 es-hadoop 2.1.0 版本之后,也新增了对 Spark 的支持,使得结合 ES 和 Spark 成为可能。
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-cdh4.tgz
wget http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.1.0.Beta4.zip
import org.apache.spark._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.elasticsearch.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
object Elastic {
def main(args: Array[String]) {
val numThreads = 1
val zookeeperQuorum = "localhost:2181"
val groupId = "test"
val topic = Array("test").map((_, numThreads)).toMap
val elasticResource = "apps/blog"
val sc = new SparkConf()
.setMaster("local[*]")
.setAppName("Elastic Search Indexer App")
sc.set("es.index.auto.create", "true")
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("checkpoint")
val logs = KafkaUtils.createStream(ssc,
zookeeperQuorum,
groupId,
topic,
StorageLevel.MEMORY_AND_DISK_SER)
.map(_._2)
logs.foreachRDD { rdd =>
val sc = rdd.context
val sqlContext = new SQLContext(sc)
val log = sqlContext.jsonRDD(rdd)
log.saveToEs(elasticResource)
}
ssc.start()
ssc.awaitTermination()
}
}