kafka connect 使用

摘要:
修改一直即可。

connect地址

https://www.confluent.io/hub

安装启动

https://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-config

https://docs.confluent.io/current/connect/managing/install.html#install-connectors

REST操作connect

参考:https://docs.confluent.io/current/connect/references/restapi.html#connect-userguide-rest

遇到到的问题

1、启动时候日志中一直刷如下日志,

Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

解决:

connect相关配置文件中bootstrap.servers=localhost:9092 ,这与kafka server.properties文件中配置的不一致!修改一直即可。

2、Couldn't start HdfsSinkConnector due to configuration error.

Caused by: org.apache.kafka.common.config.ConfigException: Invalid value for configuration locale: Locale cannot be empty

解决:

意思是要配置一个参数,经查看相关源码以及可配置属性确认为locale ,中国 配置 locale=zh_CN ;这个参数是文件分区需要使用的;

Invalid value for configuration timezone: Timezone cannot be empty

解决:timezone=Asia/Shanghai

3、JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

解决:

需要遵循一定的格式,参见:https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverterhttps://github.com/confluentinc/kafka-connect-jdbc/issues/574

从源码中找的范例如下:

{"schema":{"type":"struct","fields":[{"type":"boolean","optional":true,"field":"booleanField"},{"type":"int32","optional":true,"field":"intField"},{"type":"int64","optional":true,"field":"longField"},{"type":"string","optional":false,"field":"stringField"}]},"payload":{"booleanField":"true","intField":88,"longField":32,"stringField":"str"}}

4、io.confluent.connect.storage.errors.HiveMetaStoreException: Hive MetaStore exception

最后一个Caused by 如下,

Caused by: InvalidObjectException(message:default.test_hdfs table not found)

解决:

免责声明:文章转载自《kafka connect 使用》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇php并发控制 , 乐观锁nvm-window常用命令下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

spring kafka消费者配置介绍----ackMode

当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式; ackMode有以下7种值: public enum AckMode { // 当每一条记录被消费者监听器(ListenerConsumer)处理...

kafka消息的处理机制(五)

这一篇我们不在是探讨kafka的使用,前面几篇基本讲解了工作中的使用方式,基本api的使用还需要更深入的去钻研,多使用才会有提高。今天主要是探讨一下kafka的消息复制以及消息处理机制。 1. broker的注册 Kafka使用Zookeeper来维护集群成员的信息。每个broker都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在ka...

涨姿势了解一下Kafka消费位移可好?

摘要:Kafka中的位移是个极其重要的概念,因为数据一致性、准确性是一个很重要的语义,我们都不希望消息重复消费或者丢失。而位移就是控制消费进度的大佬。本文就详细聊聊kafka消费位移的那些事,包括: 概念剖析 kafka的两种位移 关于位移(Offset),其实在kafka的世界里有两种位移: 分区位移:生产者向分区写入消息,每条消息在分区中的位置信息...

什么,kafka能够从follower副本读数据了 —kafka新功能介绍

最近看了kafka2.4新版本的一些功能特性,不得不说,在kafka2.0以后,kafka自身就比较少推出一些新的feature了,基本都是一些修修补补的东西。倒是kafka connect和kafka stream相关的开发工作做的比较多。可能kafka的野心也不局限于要当一个中间件,而是要实现一个流处理系统的生态了。 这次要介绍的是我觉得比较有意思的两...

X 利用ogg实现oracle到kafka的增量数据实时同步

利用ogg实现oracle到kafka的增量数据实时同步 前言 https://dongkelun.com/2018/05/23/oggOracle2Kafka/ ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json。下面是我的源端和目标...

zookeeper部署kafka集群

1.准备工作: iptables -F #关闭防火墙 systemctl stop firewalld.service #关闭防火墙 准备三台虚拟机并放入/etc/hosts下 192.168.100.242 testceph 192.168.100.244 redis1 192.168.100.245 redis2 将testceph的/etc/hos...