MetadataCache更新

摘要:
MetadataCache何时更新updateCache方法以更新缓存。

MetadataCache什么时候更新

updateCache方法用来更新缓存的。

发起线程 controller-event-thread

controller选举的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/KafkaController onControllerFailover 288
kafka/controller/KafkaController elect 1658
kafka/controller/KafkaController$Startup$ process 1581
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

启动的时候选举,启动这个动作也是个事件


// KafkaController.scala
  case object Startup extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      registerSessionExpirationListener()
      registerControllerChangeListener()
      elect()
    }

  }

broker启动的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/KafkaController onBrokerStartup 387
kafka/controller/KafkaController$BrokerChange process 1208
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic删除的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/TopicDeletionManager kafka$controller$TopicDeletionManager$$onTopicDeletion 268
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
scala/collection/immutable/Set$Set1 foreach 94
kafka/controller/TopicDeletionManager resumeDeletions 333
kafka/controller/TopicDeletionManager enqueueTopicsForDeletion 110
kafka/controller/KafkaController$TopicDeletion process 1280
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic创建或者修改的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/ControllerBrokerRequestBatch updateMetadataRequestBrokerSet 291
kafka/controller/ControllerBrokerRequestBatch newBatch 294
kafka/controller/PartitionStateMachine handleStateChanges 105
kafka/controller/KafkaController onNewPartitionCreation 499
kafka/controller/KafkaController onNewTopicCreation 485
kafka/controller/KafkaController$TopicChange process 1237
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic创建这个是从队列中拿到事件再处理的方式
队列是kafka.controller.ControllerEventManager.queue
放入过程如下,本质还是监听zk的path的child的变化:

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/ControllerEventManagerput 44
kafka/controller/TopicChangeListener handleChildChange 1712
org/I0Itec/zkclient/ZkClient$10 run 848
org/I0Itec/zkclient/ZkEventThread run 85

注册监听器的代码如下:

// class KafkaController
  private def registerTopicChangeListener() = {
    zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
  }

顺带说一下有6个地方订阅了zk的子节点的变化:

  • DynamicConfigManager.startup
  • registerTopicChangeListener
  • registerIsrChangeNotificationListener
  • registerTopicDeletionListener
  • registerBrokerChangeListener
  • registerLogDirEventNotificationListener

处理创建topic事件:

// ControllerChannelManager.scala  class ControllerBrokerRequestBatch
  def sendRequestsToBrokers(controllerEpoch: Int) {
  // .......
      val updateMetadataRequest = {
        val liveBrokers = if (updateMetadataRequestVersion == 0) {
          // .......
        } else {
          controllerContext.liveOrShuttingDownBrokers.map { broker =>
            val endPoints = broker.endPoints.map { endPoint =>
              new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
            }
            new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
          }
        }
        new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, partitionStates.asJava,
          liveBrokers.asJava)
      }
      updateMetadataRequestBrokerSet.foreach { broker =>
        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null)
      }
      // .......
    }

topic创建时更新metadata再进一步的过程
构建发送请求事件放入发送队列等待发送线程发送
构建发送请求事件代码如下:

// ControllerChannelManager
  def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
                  callback: AbstractResponse => Unit = null) {
    brokerLock synchronized {
      val stateInfoOpt = brokerStateInfo.get(brokerId)
      stateInfoOpt match {
        case Some(stateInfo) =>
          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
        case None =>
          warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
      }
    }
  }

调用栈:

CLASS_NAMEMETHOD_NAMELINE_NUM
kafka/controller/ControllerChannelManagersendRequest81
kafka/controller/KafkaControllersendRequest662
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2apply405
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2apply405
scala/collection/mutable/HashMap$$anonfun$foreach$1apply130
scala/collection/mutable/HashMap$$anonfun$foreach$1apply130
scala/collection/mutable/HashTable$classforeachEntry241
scala/collection/mutable/HashMapforeachEntry40
scala/collection/mutable/HashMapforeach130
kafka/controller/ControllerBrokerRequestBatchsendRequestsToBrokers502
kafka/controller/PartitionStateMachinehandleStateChanges105
kafka/controller/KafkaControlleronNewPartitionCreation499
kafka/controller/KafkaControlleronNewTopicCreation485
kafka/controller/KafkaController$TopicChangeprocess1237
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply$mcV$sp53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1apply53
kafka/metrics/KafkaTimertime32
kafka/controller/ControllerEventManager$ControllerEventThreaddoWork64
kafka/utils/ShutdownableThreadrun70

发送线程发送请求:
代码如下:

// ControllerChannelManager.scala class RequestSendThread
  override def doWork(): Unit = {

    def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))

    val QueueItem(apiKey, requestBuilder, callback) = queue.take()
    //...
    while (isRunning.get() && !isSendSuccessful) {
        // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
        // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
        try {
          if (!brokerReady()) {
            isSendSuccessful = false
            backoff()
          }
          else {
            val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
              time.milliseconds(), true)
            clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
            isSendSuccessful = true
          }
        } catch {
          case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
            warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
              "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                requestBuilder.toString, brokerNode.toString), e)
            networkClient.close(brokerNode.idString)
            isSendSuccessful = false
            backoff()
        }
      }
      // ......
  }

响应线程

CLASS_NAMEMETHOD_NAMELINE_NUM
kafka/server/MetadataCachekafka$server$MetadataCache$$addOrUpdatePartitionInfo150
kafka/utils/CoreUtils$inLock219
kafka/utils/CoreUtils$inWriteLock225
kafka/server/MetadataCacheupdateCache184
kafka/server/ReplicaManagermaybeUpdateMetadataCache988
kafka/server/KafkaApishandleUpdateMetadataRequest212
kafka/server/KafkaApishandle142
kafka/server/KafkaRequestHandlerrun72

线程信息: kafka-request-handler-5
partitionMetadataLock读写锁控制cache数据的读取与写入的线程安全。元数据信息在发送请求中已经构造好了。此处还涉live broker的更新等。

应该还要补充:leader切换和isr变化等

免责声明:文章转载自《MetadataCache更新》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇springboot使用maven打包实现resources分离和实现lib、resources分离BI笔记之Cube增量处理的一个场景的处理方案下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

涨姿势了解一下Kafka消费位移可好?

摘要:Kafka中的位移是个极其重要的概念,因为数据一致性、准确性是一个很重要的语义,我们都不希望消息重复消费或者丢失。而位移就是控制消费进度的大佬。本文就详细聊聊kafka消费位移的那些事,包括: 概念剖析 kafka的两种位移 关于位移(Offset),其实在kafka的世界里有两种位移: 分区位移:生产者向分区写入消息,每条消息在分区中的位置信息...

Kafka 数据文件存储-可靠性保证-ISR核心知识

一、Kafka 数据存储流程和 log 日志讲解 Kafka 采取了分片和索引机制,将每个 Partition 分为多个segment,每个 segment 对应2个文件 log 和 index index文件中并没有为每一条message建立索引,采用了稀疏存储的方式 每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件...

kafka错误集锦

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchResponseSize,clientId=iot 在CloudearManager中安装kafka时,报了这样一个错: [Kafka S...

几篇关于MySQL数据同步到Elasticsearch的文章---第一篇:Debezium实现Mysql到Elasticsearch高效实时同步

文章转载自:https://mp.weixin.qq.com/s?__biz=MzI2NDY1MTA3OQ==&mid=2247484358&idx=1&sn=3a783479bb6a1852589f4c4cf3c5d310&chksm=eaa82beedddfa2f822db1492e5f82f7f43d877f2abed...

zookeeper部署kafka集群

1.准备工作: iptables -F #关闭防火墙 systemctl stop firewalld.service #关闭防火墙 准备三台虚拟机并放入/etc/hosts下 192.168.100.242 testceph 192.168.100.244 redis1 192.168.100.245 redis2 将testceph的/etc/hos...

X 利用ogg实现oracle到kafka的增量数据实时同步

利用ogg实现oracle到kafka的增量数据实时同步 前言 https://dongkelun.com/2018/05/23/oggOracle2Kafka/ ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json。下面是我的源端和目标...