c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)

摘要:
之前的同事已经提出了一个整体解决方案,并从技术上初步验证了蜂巢的性能。现在进入工程实践阶段,我们需要解决的第一个问题是首先实现链接服务程序,该程序用作Kafka的生产者,以生成消息并将其写入Kafka。因为链接服务程序是由c#编写的,所以我们需要成为一个Confluent Kafka。代理列表是Kafka的外部服务地址,主题是在Kafka上创建的主题。但是,当我们在官方服务器上测试时,我们未能发送超时错误:Local:Messagetimeout起初很混乱,很难创建防火墙、重新启动kafka、修改主机和其他配置。

最近项目上因为遇到数据量过大导致查询统计性能问题(oracle数据库、单表每月1亿多条车辆定位记录,由一个windows环境下的链路服务程序来接收车辆上传的定位数据写入oracle),急需使用大数据架构来解决。前期同事已经提出整体解决思路(修改链路服务程序,在写oracle的基础上同时写入kafka,之后再用etl工具从kafka保存到hive下)并从技术上初步验证了hive的性能。现在进入工程实操环节,我们需要解决的第一个问题就是先实现链路服务程序,作为kafka的producer产生消息写入kafka。

由于这个链路服务程序是c#写的,所以我们需要Confluent.Kafka来实现对kafka的操作。为了验证技术可走通,先做一个测试工程:

c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)第1张

通过nuget引入Confluent.Kafka,当前是1.5.0的版本,需要对应的framework版本是4.5。brokerlist是kafka的对外服务地址,topic是kafka上创建的主题(类比理解的话,kafka的主题就相当于是传统数据库的表,它把一类消息放在一个主题下,主题下可以有多个分区)。

测试代码很简单,两个按钮,一个按钮开始测试,构建消息、发送消息;一个按钮停止发送消息:

c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)第2张

c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)第3张

说一下我们遇到的坑:

测试程序在kafka所在机器上运行很正常,通过命令行可以正常看到发送过来的测试数据。但是当我们在正式服务器上测试的时候(测试程序与kafka服务不在一起,分别在两台服务器上),却死活发送不成功,报超时错误:Local:Message timed out

c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)第4张

刚开始一头雾水,搞防火墙、重启kafka、修改hosts等等各种配置不好使。后来搜到网上有帖子说,需要配置一下advertised.listeners详见:https://www.cnblogs.com/wangxinblog/p/7623419.html配置完成后,消息发送成功!

c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)第5张

c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)第6张

免责声明:文章转载自《c#使用Confluent.Kafka实现生产者发送消息至kafka(远程连接kafka发送消息超时的解决 Local:Message timed out)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Python+OpenCV图像处理之模糊操作Android 聊天室(一)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

javascript书籍集合

对于前端书籍的搜集,我是有强迫症的,就一个态度:我全都要! 下面是干货,关注公众号:撩撩前端,回复相应消息码,就可以获取对应电子书。 JavaScript.DOM高级程序设计 关注公众号"撩撩前端",回复消息:026557,获取电子书籍 JavaScript网页特效范例宝典 关注公众号"撩撩前端",回复消息:408289,获取电子书籍 JavaScript...

ByteBuf Netty的数据容器

两个组件 ByteBuf ByteBufHolder  使用模式 1.堆缓冲区 backing array模式 直接缓冲区 直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外。 复合缓冲区 CompositeByteBuf 为了举例说明,让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议传输的消息。这两部分由应用程序的不同模...

网页实时聊天之PHP实现websocket

前言 websocket 作为 HTML5 里一个新的特性一直很受人关注,因为它真的非常酷,打破了 http “请求-响应”的常规思维,实现了服务器向客户端主动推送消息,本文介绍如何使用 PHP 和 JS 应用 websocket 实现一个网页实时聊天室; 以前写过一篇文章讲述如何使用ajax长轮询实现网页实时聊天,见链接: 网页实时聊天之js和jQuer...

Scala创建SparkStreaming获取Kafka数据代码过程

正文 首先打开spark官网,找一个自己用版本我选的是1.6.3的,然后进入SparkStreaming ,通过搜索这个位置找到Kafka, 点击过去会找到一段Scala的代码        import org.apache.spark.streaming.kafka._      val kafkaStream = KafkaUtils.cre...

Akka入门实例

Akka入门实例 Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。 Actor模型并非什么新鲜事物,它由Carl Hewitt于上世纪70年代早期提出,目的是为了解决分布式编程中一系列的编程问题。其特点如下: 系统中的所有事物都可以扮演一个Actor Actor之间完全独...

RocketMQ 4.x 介绍以及安装

Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件. 官网:http://rocketmq.apache.org/ 特点 支持 Broker 和 Consumer 端消息过滤 支持发布订阅模型,和点对点, 支持拉 pull 和推 push 两种消息模式 单一队列百万消息、亿级消息堆积 支持单 master 节点,多 mas...