spark+kafka 小案例

摘要:
//创建主题//vartopic=Map{“test”->1}vartopic=Array(“test”)//指定动物园管理员//创建使用者组vargroup=“con consumer group”//使用者配置valkafkaParam=Map(“bootstrap.servers”->

(1)下载kafka的jar包

http://kafka.apache.org/downloads
spark2.1 支持kafka0.8.2.1以上的jar,我是spark2.0.2,下载的kafka_2.11-0.10.2.0
(2)Consumer代码
package com.sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
object SparkStreamKaflaWordCount {
def main(args: Array[String]): Unit = {
//创建streamingContext
var conf=new SparkConf().setMaster("spark://192.168.177.120:7077")
.setAppName("SparkStreamKaflaWordCount Demo");
var ssc=new StreamingContext(conf,Seconds(4));
//创建topic
//var topic=Map{"test" -> 1}
var topic=Array("test");
//指定zookeeper
//创建消费者组
var group="con-consumer-group"
//消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> "192.168.177.120:9092,anotherhost:9092",//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> group,
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
);
//创建DStream,返回接收到的输入数据
var stream=KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topic,kafkaParam))
 //每一个stream都是一个ConsumerRecord
stream.map(s =>(s.key(),s.value())).print();
ssc.start();
ssc.awaitTermination();
}
}
(3)启动zk
//我是已经配置好zookeeper的环境变量了,
zoo1.cfg配置
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/home/zhangxs/datainfo/developmentData/zookeeper/zkdata1
# the port at which the clients will connect
clientPort=2181
server.1=zhangxs:2881:3881
启动zk服务
zkServer.sh start zoo1.cfg
(4)启动kafka服务

【bin/kafka-server-start.sh config/server.properties】

[root@zhangxs kafka_2.11]# bin/kafka-server-start.sh config/server.properties
[2017-03-25 18:42:03,153] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delete.topic.enable = false
fetch.purgatory.purge.interval.requests = 1000
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 0.10.2-IV0
leader.imbalance.check.interval.seconds = 300
(5)(重新打开一个终端)启动生产者进程
[root@zhangxs kafka_2.11]# bin/kafka-console-producer.sh --broker-list 192.168.177.120:9092 --topic test
(6)将代码打成jar,jar名【streamkafkademo】,放到spark_home/jar/ 下面
 
(7)提交spark应用程序(消费者程序)
./spark-submit --class com.sparkstreaming.SparkStreamKaflaWordCount  /usr/local/development/spark-2.0/jars/streamkafkademo.jar 10
(8)在生产者终端上输入数据
zhang xing sheng
(9)打印结果
17/03/25 19:06:36 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 99 on executor id: 0 hostname: 192.168.177.120.
17/03/25 19:06:36 INFO storage.BlockManagerInfo: Added broadcast_99_piece0 in memory on 192.168.177.120:35107 (size: 1913.0 B, free: 366.3 MB)
17/03/25 19:06:36 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 99.0 (TID 99) in 18 ms on 192.168.177.120 (1/1)
17/03/25 19:06:36 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 99.0, whose tasks have all completed, from pool
17/03/25 19:06:36 INFO scheduler.DAGScheduler: ResultStage 99 (print at SparkStreamKaflaWordCount.scala:34) finished in 0.019 s
17/03/25 19:06:36 INFO scheduler.DAGScheduler: Job 99 finished: print at SparkStreamKaflaWordCount.scala:34, took 0.023450 s
-------------------------------------------
Time: 1490439996000 ms
-------------------------------------------
(null,zhang xing sheng)
 
遇到过的问题:
(1)在使用eclipse编写消费者程序时发现没有KafkaUtils类。 这个jar是需要另下载的。然后build到你的工程里就可以了
maven
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming_2.11</artifactId>
  4. <version>2.1.0</version>
  5. </dependency>
jar下载
http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%222.1.0%22
spark+kafka 小案例第1张
 
(2)在提交spark应用程序的时候,抛出类找不到
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
at com.sparkstreaming.SparkStreamKaflaWordCount$.main(SparkStreamKaflaWordCount.scala:25)
at com.sparkstreaming.SparkStreamKaflaWordCount.main(SparkStreamKaflaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
------------------------------------------------------------------------ Exception
in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/KafkaUtils$ at com.sparkstreaming.SparkStreamKaflaWordCount$.main(SparkStreamKaflaWordCount.scala:33) at com.sparkstreaming.SparkStreamKaflaWordCount.main(SparkStreamKaflaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
这个需要你将【spark-streaming-kafka-0-10_2.11-2.1.0】,【kafka-clients-0.10.2.0】这两个jar添加到 spark_home/jar/路径下就可以了。(这个只是我这个工程里缺少的jar)

免责声明:文章转载自《spark+kafka 小案例》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇虚拟机创建及部署[转载]OpenSSL身份认证 RSA、ECC、SM2下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

BTC_ETH_USDT_自动充提币API接口,钱包对接交易所教程!

 以目前市面上用的最多的优盾钱包开放平台为例。 详细的接口文档如下: 官方接口文档链接:https://www.uduncloud.com/gateway-interface 1、目录 1.1、生成地址 1.2、提币 1.3、代付 1.4、交易回调 1.5、校验地址合法性 1.6、获取商户支持币种信息 2、接口明细 1、生成地址 1.1 场景说明 请求指...

HTML之实现下拉式菜单

示例代码: <!DOCTYPE html> <html> <head> <STYLE type=text/css> body { font-family: arial, ̎ͥ, serif; font-size:12px; padding:10px } #nav { line-height: 24px;...

13.solr学习速成之IK分词器

IKAnalyzer简介 IKAnalyzer是一个开源的,基于java语言开发的轻量级的中文分词工具包。 IKAnalyzer特性 a. 算法采用“正向迭代最细粒度切分算法”,支持细粒度和最大词长两种分词方式,速度最大支持80W字/秒(1600KB/秒)。   b. 支持多子处理器分析模式:中文、数字、字母,并兼容日文、韩文。  c. 较小的...

maven创建ssm项目依赖(pom.xml文件)

该pom.xml文件包含的依赖有:   spring    springMVC   mybatis   mybatis连接spring   commons-logging   c3p0连接池   java连接mysql   servlet jsp   log4j  日志   gson  解析json数据格式   commons-io  用于上传文件 <...

layui--入门(helloWorld)

具体可参考官方文档:https://www.layui.com/doc/ 由于引入layui 需要用到node.js 安装过程可参考: https://www.cnblogs.com/liuchenxing/p/8036384.html layui--入门(helloWorld)  1.首先官网首页下载 layui  https://www.layui.c...

怎么彻底关闭flash助手弹窗?

日常使用中,flash助手弹窗不胜其扰,天天推荐游戏,早看它不顺眼了,那么怎么彻底关闭flash助手弹窗呢?我认真分析了下,做了个教程 1.打开任务管理器,并打开服务  右键->属性->禁用->应用 经过这些操作,就可以彻底禁掉flash助手频繁弹窗,打扰正常工作了. 喜欢的点个赞或者评论下,搞定收工!...