kafka消费组创建和删除原理

摘要:
Defmain{//…valconsumerGroupService={ifnewKafka ConsumerGroupService//对于新版本的kafka,KafkaConsumerGroupServiceselsenewZkConsumerGroupService}尝试{ifconsumerGroupServices.list()//以此为例查看消费组elseiffeconsumerGroupService的形式。describecase_=˃抛出newIllegalStateException}}//…}让我们以KafkaConsumerGroupService#list为例来查看消费组的形式。KafkaConsumerGroupService#list用于获取所有消费组。要获取代理上的消费组,需要发送ApiKeys。LIST_ GROUPS的请求。从这里,我们可以看到消费组与GroupMetadataManager相关。

0.10.0.0版本的kafka的消费者和消费组已经不在zk上注册节点了,那么消费组是以什么形式存在的呢?

1 入口

看下kafka自带的脚本kafka-consumer-groups.sh,可见脚本调用了kafka.admin.ConsumerGroupCommand

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

看下ConsumerGroupCommand,从代码中可以看出新版本的kafka不支持删除消费组操作,实际上,当消费组内消费者为空的时候消费组就会被删除。

def main(args: Array[String]) {
    // ...
    val consumerGroupService = {
      if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts) // 对于新版本kafka来说调用的是KafkaConsumerGroupService
      else new ZkConsumerGroupService(opts)
    }

    try {
      if (opts.options.has(opts.listOpt))
        consumerGroupService.list() // 以此为例来看下消费组存在的形式
      else if (opts.options.has(opts.describeOpt))
        consumerGroupService.describe()
      else if (opts.options.has(opts.deleteOpt)) {
        consumerGroupService match {
          case service: ZkConsumerGroupService => service.delete()
          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService")
        }
      }
    } 
    // ...
  }

我们以KafkaConsumerGroupService#list为例来看下消费组存在的形式。KafkaConsumerGroupService#list用于获取所有的消费组。沿着代码一直追溯可以看到其会调用AdminClient#listAllGroups。从代码中可以看出要想获取到所有消费组,就需要遍历每个broker。而要获取某个broker上的消费组则需要发送ApiKeys.LIST_GROUPS的请求。

def listAllGroups(): Map[Node, List[GroupOverview]] = {
    findAllBrokers.map {
      case broker =>
        broker -> { // 需要遍历每个broker
          try {
            listGroups(broker)
          } catch {
            case e: Exception =>
              debug(s"Failed to find groups from broker ${broker}", e)
              List[GroupOverview]()
          }
        }
    }.toMap
}

def listGroups(node: Node): List[GroupOverview] = { // 向相应broker发送请求来获取改broker上的消费组信息
    val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
    val response = new ListGroupsResponse(responseBody)
    Errors.forCode(response.errorCode()).maybeThrow()
    response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
  }

看下KafkaApis.scala对应的请求处理方法handleListGroupsRequest

def handleListGroupsRequest(request: RequestChannel.Request) {
    // ... 
    
      val (error, groups) = coordinator.handleListGroups() // 关键,获取消费组列表
      val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } 
      new ListGroupsResponse(error.code, allGroups.asJava)
    }
    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
  }

顺着coordinator.handleListGroups一直往下,可以看到最终是调用GroupMetadataManager#currentGroups来获取到broker上的消费组的。到这里我们可以看出,消费组和GroupMetadataManager有关。

def currentGroups(): Iterable[GroupMetadata] = groupsCache.values

2 存在形式

GroupMetadata表示一个消费组,MemberMetadata表示一个消费者。先放下总结的图
group

GroupMetadataManager有个groupsCache属性保存了该broker所管辖的消费组

private val groupsCache = new Pool[String, GroupMetadata]

看下GroupMetadata的内部属性

private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {

  private val members = new mutable.HashMap[String, MemberMetadata] // 消费组的客户端
  private var state: GroupState = Stable
  var generationId = 0 // generationId 用于reblance
  var leaderId: String = null
  var protocol: String = null
  // ... 
}

// MemberMetadata表示一个消费者
private[coordinator] class MemberMetadata(val memberId: String,
                                          val groupId: String,
                                          val clientId: String,
                                          val clientHost: String,
                                          val sessionTimeoutMs: Int,
                                          var supportedProtocols: List[(String, Array[Byte])]) {

  var assignment: Array[Byte] = Array.empty[Byte] // 消费者分配到的partiton
  var awaitingJoinCallback: JoinGroupResult => Unit = null
  var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
  var latestHeartbeat: Long = -1
  var isLeaving: Boolean = false
  // ...
}

以上就是消费组及其消费者的存在形式,即存在缓存变量中,而不是持久在其他什么地方

3 消费组的创建

消费组是不会单独创建的,消费组的创建是在消费者第一次发送join_group请求的时候创建的。创建消费组过程也很简单,就是在GroupMetadataManager#groupsCache加入代表该消费组的GroupMetadata

GroupCoordinator#handleJoinGroup

def handleJoinGroup(groupId: String,
                      memberId: String,
                      clientId: String,
                      clientHost: String,
                      sessionTimeoutMs: Int,
                      protocolType: String,
                      protocols: List[(String, Array[Byte])],
                      responseCallback: JoinCallback) {
    // ...
    } else {
      var group = groupManager.getGroup(groupId)
      if (group == null) {
        if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
          responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
        } else {
          group = groupManager.addGroup(new GroupMetadata(groupId, protocolType)) // 关键,如果group为空,则添加一个group
          doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
        }
      } else {
        doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
      }
    }
  }

GroupMetadataManager#addGroup

def addGroup(group: GroupMetadata): GroupMetadata = {
    val currentGroup = groupsCache.putIfNotExists(group.groupId, group) // 加入代表该消费组的GroupMetadata
    if (currentGroup != null) {
      currentGroup
    } else {
      group
    }
  }

4 消费组的删除

在第一节ConsumerGroupCommand中我们可以知道消费组是不支持手动删除的,那么消费组是怎么删除的呢,实际上当消费组中的消费者为空的时候,消费组就会被删除。

4.1 删除动作

看下GroupMetadataManager#removeGroup,我先看下删除消费组都有哪些动作

def removeGroup(group: GroupMetadata) {
    if (groupsCache.remove(group.groupId, group)) { // 从cache中移除group
        // 然后再__consumer_offsets主题中该group对应的partition写一个tombstone消息,用于压缩,这是因为__consumer_offsets不会删除,只会压缩

      val groupPartition = partitionFor(group.groupId) // 计算group相关联分区,默认是abs(hashcode) % 50
      val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition)

      // 然后将tombstone写入该partition,用于压缩
      val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId),
        timestamp = timestamp, magicValue = magicValue)

      val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)
      partitionOpt.foreach { partition =>
        val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition)

        trace("Marking group %s as deleted.".format(group.groupId))

        try {
          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
        } catch {
          case t: Throwable =>
            error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
          // ignore and continue
        }
      }
    }
  }

由以上可以看出,删除消费组有两个动作

  1. 将cache,即(Pool[String, GroupMetadata])中的消费组移除
  2. 在__consumer_offsets中要删除消费组相关的partition中写入tombstone,而不会删除要删除消费组的相关记录

4.2 触发删除的动作

唯一调用GroupMetadataManager#removeGroup的地方是GroupCoordinator#onCompleteJoin,而调用GroupCoordinator#onCompleteJoin的唯一地方是DelayedJoin。

GroupCoordinator#onCompleteJoin

def onCompleteJoin(group: GroupMetadata) {
    // ...
        if (group.isEmpty) {
          group.transitionTo(Dead) // 先将消费组置位dead状态,然后移除
          groupManager.removeGroup(group)
          info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
        }
      }
      // ...
}

GroupCoordinator#onCompleteJoin

private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
                                            group: GroupMetadata,
                                            sessionTimeout: Long)
  extends DelayedOperation(sessionTimeout) {

  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
  override def onExpiration() = coordinator.onExpireJoin()
  override def onComplete() = coordinator.onCompleteJoin(group)
}

难道是在joinGroup操作的时候删除消费组吗?其实并不是,而是在heartbeat超时的时候删除的,即当最后一个消费者心跳超时或者说消费组内没有了消费者的时候,该消费组就对被删除。从DelayedHeartbeat开始看下

private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                            group: GroupMetadata,
                                            member: MemberMetadata,
                                            heartbeatDeadline: Long,
                                            sessionTimeout: Long)
  extends DelayedOperation(sessionTimeout) {
  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) // 关注这里
  override def onComplete() = coordinator.onCompleteHeartbeat()
}

def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    group synchronized {
      if (!shouldKeepMemberAlive(member, heartbeatDeadline))
        onMemberFailure(group, member) // 关注这里
    }
  }
}

private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
    trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
    group.remove(member.memberId)
    group.currentState match {
      case Dead =>
      case Stable | AwaitingSync => maybePrepareRebalance(group) // 假设消费组有一个消费者处于Stable状态,当该消费者超时后,就会调用maybePrepareRebalance
      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    }
}

private def maybePrepareRebalance(group: GroupMetadata) {
    group synchronized {
      if (group.canRebalance)
        prepareRebalance(group) // 关注这里
    }
}

private def prepareRebalance(group: GroupMetadata) {
    
    if (group.is(AwaitingSync))
      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)

    group.transitionTo(PreparingRebalance)
    info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))

    val rebalanceTimeout = group.rebalanceTimeout
    val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout) // 最终DelayedJoin在这里被调用
    val groupKey = GroupKey(group.groupId)
    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  }

由以上我们可以总结出,就是在heartbeat超时后会进行reblance操作,最终调用GroupCoordinator#prepareRebalance,这个时候如果消费组中members为空则会删除。

5 总结

  1. 消费组只存在一个Pool[String, GroupMetadata], 并没有持久化
  2. 当第一个消费者join请求来的时候,才会创建消费组,创建消费组即在Pool[String, GroupMetadata]加入代表该消费组的GroupMetadata
  3. 不能手动删除消费组,删除消费组的时机是当最后一个消费者离开的时候,会触发heartbeat超时从而reblance将消费组删除
  4. 消费组删除涉及两个动作,一个是将消费组从Pool[String, GroupMetadata]中移除,另一个是在__consumer_offsets中写入tombstone
  5. __consumer_offsets只会压缩不会删除

免责声明:文章转载自《kafka消费组创建和删除原理》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇aos.js超赞页面滚动元素动画jQuery动画库Lodash学习笔记下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

ASP.NET Core 中间件(Middleware)(一)

本文主要目标:记录Middleware的运行原理流程,并绘制流程图。 目录结构: 1、运行环境 2、Demo实践 3、源码追踪 4、AspnetCore内置middleware 一、运行环境 Visual Studio Community 2019 版本 16.8.5 .Net Sdk Version: 5.0.103 二、Demo实践 讲解或学习一个东...

spark性能调优06-数据倾斜处理

1、数据倾斜 1.1 数据倾斜的现象 现象一:大部分的task都能快速执行完,剩下几个task执行非常慢 现象二:大部分的task都能快速执行完,但总是执行到某个task时就会报OOM,JVM out of Memory,task faild,task lost,resubmitting task等错误 1.2 出现的原因 大部分task分配的数据很少(某...

.Net Task<T>的一种比较神奇的卡死情况(Wait/Result卡死, await能得到结果)

出现的环境.Net4.0 + WebApi1(4.0.30506.0) +Microsoft.Bcl.Async.1.0.168 自己死活看不出原因, 分享出来给大家看看,希望有人能找到问题的关键 出现错误的是下面这两个模块 下面的CorsMessageHandler,抄的http://www.cnblogs.com/artech/p/cors-4-asp...

json前后台传输,以及乱码中文问题探讨

背景介绍:   我现在的工作是做传统项目开发,没有用到框架。最近在做项目时,经常需要使用ajax从后台拿数据到前台,是json格式的。先说下我在项目中遇到的问题吧,前台拿到了数据,需要将其转化为对象,我使用的是jquery插件带有的jQuery.parseJSON() 这个方法,没有效果,使用浏览器自带的JSON.parse(str)也是没有效果,通过查阅...

VLC客户端和SDK的简单应用

VLC_SDK编程指南 VLC 是一款自由、开源的跨平台多媒体播放器及框架,可播放大多数多媒体文件,以及 DVD、音频 CD、VCD 及各类流媒体协议。它可以支持目前市面上大多数的视频解码,除了Real。 VLC_SDK的调用 VLC的SDK使用C语言写成,它的解码库部分的基础是FFMpeg,FFMpeg也是一套可以用来记录、转换数字音频、视频,并能将其转...

(一)JIRA API 对接

系统要跟JIRA对接,将本系统数据发送给jira,jira数据返回给本系统。 开始一头雾水怎么让数据传过去已什么形式存在,是存数据库呢还是怎么显示呢。研究半天发现其实只要将原数据作为json数据提供给jira接口,jira接口进行创建issue。 但前提在于要先创建项目。 jira的API 有很多有创建项目的,创建问题等。在线找到了6.1版本的API,根据...