博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
scala spark(2.10)读取kafka(2.11_1.0.0)示例
阅读量:6429 次
发布时间:2019-06-23

本文共 2184 字,大约阅读时间需要 7 分钟。

1、pom加载jar包

org.apache.spark
spark-streaming_2.11
2.1.0
org.apache.kafka
kafka_2.11
1.0.0
org.apache.spark
spark-streaming-kafka-0-10_2.11
2.1.0
2、代码
package cn.piesat import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.{Seconds, StreamingContext} object App {
private val brokers="hadoop01:9092" def main(args:Array[String]):Unit={
val spark=getSparkSession() val sc=spark.sparkContext val ssc=new StreamingContext(sc,Seconds(3)) val topics=Array("lj01") val kafkaParams=Map[String,Object]( "bootstrap.servers"->brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val messages=KafkaUtils.createDirectStream[String,String]( ssc, PreferConsistent, Subscribe[String,String](topics,kafkaParams) ) val lines=messages.map(x=>{
x.value() }) val wordCounts=lines.flatMap(x=>{
x.split(" ").map(x=>(x,1)) }).reduceByKey(_+_) wordCounts.print() ssc.start() ssc.awaitTermination() } def getSparkSession():SparkSession={
val spark=SparkSession .builder() .appName("sparkSql") .config("spark.some.config.option","some-value") .master("local[4]") .getOrCreate() spark } }

转载于:https://www.cnblogs.com/runnerjack/p/8604410.html

你可能感兴趣的文章
微信公众平台模拟群发技术
查看>>
C语言学习之指针详解
查看>>
学习使用Bing Maps Silverlight Control(一):准备和新建
查看>>
什么是Scrum
查看>>
nginx负载均衡的5种策略
查看>>
90%人都不知道:SVN 和 Git 的一些误解和真相
查看>>
防火墙配置十大任务之九,验证防火墙的运行
查看>>
【linux】浅谈Linux下的 find 指令
查看>>
CentOS 7 使用kubeadm 部署 Kubernetes
查看>>
我的友情链接
查看>>
透视美国大数据爆发全景
查看>>
java学习第一天1.2
查看>>
清空输入缓冲区的方法
查看>>
Yii2 项目优化小贴士
查看>>
UIScrollView的判断位置的属性如下:
查看>>
Applicatin Loader上传app步骤记录
查看>>
两种方法修改table表的内容
查看>>
张小龙莫慌 马化腾莫急 你们要好好的 微信还有时间
查看>>
一些常用软件静默安装参数(nsis,msi,InstallShield ,Inno)
查看>>
部署mimic版本的Ceph分布式存储系统
查看>>