华为云用户手册

  • Smart Connect监控指标 表5 Smart Connect支持的监控指标 指标ID 指标名称 指标含义 取值范围 测量对象 监控周期(原始指标) kafka_wait_synchronize_data 待同步Kafka数据量 Kafka任务的待同步数据量。 单位:Count ≥ 0 Kafka实例的Smart Connect任务 1分钟 kafka_synchronize_rate Kafka每分钟同步数据量 Kafka任务每分钟同步的数据量。 单位:Count ≥ 0 Kafka实例的Smart Connect任务 1分钟 task_status 任务状态 当前任务状态。 0:任务异常 1:任务正常 Kafka实例的Smart Connect任务 1分钟 message_delay 消息时延 消息到达源端的时间与到达目标端的时间之差。 单位:ms ≥ 0 Kafka实例的Smart Connect任务 1分钟 使用Smart Connect监控指标时,请注意如下几点: Kafka双向数据复制的Smart Connect任务在监控中会被拆分为2个任务,分别为“Smart Connect任务名_source_0”和“Smart Connect任务名_source_1”。 如果Topic中的消息在进行下一次数据同步前,已经全部老化,此时实际是没有待同步的Kafka数据,但是Kafka数据同步监控指标使用的是包含老化数据的offset值,“待同步Kafka数据量”会显示老化的消息数。
  • 维度 Key Value kafka_instance_id Kafka实例 kafka_broker Kafka实例节点 kafka_topics Kafka实例主题 kafka_partitions Kafka实例分区 kafka_groups-partitions Kafka实例分区的消费组 kafka_groups_topics Kafka实例队列的消费组 kafka_groups Kafka实例的消费组 connector_task Kafka实例的Smart Connect任务
  • 主题监控指标 表3 主题支持的监控指标 指标ID 指标名称 指标含义 取值范围 测量对象 监控周期(原始指标) topic_bytes_in_rate 生产流量 该指标用于统计每秒生产的字节数。 单位:Byte/s、KB/s、MB/s、GB/s 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000000 Kafka实例队列 1分钟 topic_bytes_out_rate 消费流量 该指标用于统计每秒消费的字节数。 单位:Byte/s、KB/s、MB/s、GB/s 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000000 Kafka实例队列 1分钟 topic_data_size 队列数据容量 该指标用于统计队列当前的消息数据大小。 单位:Byte、KB、MB、GB、TB、PB 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~5000000000000 Kafka实例队列 1分钟 topic_messages 队列消息总数 该指标用于统计队列当前的消息总数。 单位:Count 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 ≥ 0 Kafka实例队列 1分钟 topic_messages_in_rate 消息生产速率 该指标用于统计每秒生产的消息数量。 单位:Count/s 在“主题”页签,当“监控类型”为“基本监控”时,才包含该指标。 0~500000 Kafka实例队列 1分钟 partition_messages 分区消息数 该指标用于统计分区中当前的消息个数。 单位:Count 在“主题”页签,当“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Kafka实例队列 1分钟 produced_messages 生产消息数 该指标用于统计目前生产的消息总数。 单位:Count 在“主题”页签,当“监控类型”为“分区监控”时,才包含该指标。 ≥ 0 Kafka实例队列 1分钟
  • 实例监控指标 表1 实例支持的监控指标 指标ID 指标名称 指标含义 取值范围 测量对象 监控周期(原始指标) current_partitions 分区数 该指标用于统计Kafka实例中已经使用的分区数量。 单位:个 0~100000 Kafka实例 1分钟 current_topics 主题数 该指标用于统计Kafka实例中已经创建的主题数量。 单位:个 0~100000 Kafka实例 1分钟 group_msgs 堆积消息数 该指标用于统计Kafka实例中所有消费组中总堆积消息数。 单位:个 0~1000000000 Kafka实例 1分钟 instance_bytes_in_rate 生产流量 统计实例中每秒生产的字节数。 单位:Byte/s、KiB/s、MiB/s、GiB/s、TiB/s、PiB/s 部分存量实例不支持此监控,具体以控制台为准。 0~1000000 Kafka实例 1分钟 instance_bytes_out_rate 消费流量 统计实例中每秒消费的字节数。 单位:Byte/s、KiB/s、MiB/s、GiB/s、TiB/s、PiB/s 部分存量实例不支持此监控,具体以控制台为准。 0~1000000 Kafka实例 1分钟 current_partitions_usage 分区使用率 该指标用于统计分区使用率。 单位:% 部分存量实例不支持此监控,具体以控制台为准。 0~100% Kafka实例 1分钟
  • 步骤二:开始诊断 单击“开始诊断”,在“诊断记录”区域,显示一条“状态”为“诊断中”的记录。 当“状态”变为“诊断成功”时,表示消息积压诊断已经完成。 在诊断记录所在行,单击“查看详情”,进入“诊断详情”页面。 在页面顶部查看消息组异常项、失败项和正常项的数量。在“诊断维度”区域,单击包含异常项的维度名称,例如:重平衡,查看导致异常的可能原因、受影响的分区或者broker,以及解决消息堆积的建议。 图3 诊断详情
  • 前提条件 删除消息前,请先在消费代码中设置“auto.offset.reset”参数。“auto.offset.reset”用来指定当Kafka中没有初始偏移量或者当前偏移量不存在(例如当前偏移量已被删除)时,消费者的消费策略。取值如下: latest:偏移量自动被重置到最晚偏移量。 earliest:偏移量自动被重置到最早偏移量。 none:向消费者抛出异常。 如果将此配置设置为latest,新增分区时,生产者可能会在消费者重置初始偏移量之前开始向新增加的分区发送消息,从而导致部分消息丢失。
  • 准备实例依赖资源 创建Kafka实例前,请提前准备好如表1所示资源。 表1 Kafka实例依赖资源 资源名称 要求 创建指导 VPC和子网 Kafka实例可以使用当前账号下已创建的VPC和子网,也可以使用新创建的VPC和子网,还可以使用共享VPC和子网,请根据实际需要进行配置。 共享VPC是基于 资源访问管理 (Resource Access Manager,简称 RAM )服务的机制,VPC的所有者可以将VPC内的子网共享给一个或者多个账号使用。通过共享VPC功能,可以简化网络配置,帮助您统一配置和运维多个账号下的资源,有助于提升资源的管控效率,降低运维成本。有关VPC子网共享的更多信息,请参见共享VPC。 在创建VPC和子网时应注意:创建的VPC与Kafka实例在相同的区域。 在创建VPC和子网时应注意: 创建的VPC与Kafka实例在相同的区域。 子网开启IPv6后,Kafka实例支持IPv6功能。Kafka实例开启IPv6后,客户端可以使用IPv6地址连接实例。 创建VPC和子网的操作指导请参考创建虚拟私有云和子网,若需要在已有VPC上创建和使用新的子网,请参考为虚拟私有云创建新的子网。 安全组 不同的Kafka实例可以重复使用相同的安全组,也可以使用不同的安全组,请根据实际需要进行配置。 连接Kafka实例前,请根据连接方式配置对应的安全组,具体请参考表2。 创建安全组的操作指导请参考创建安全组,为安全组添加规则的操作指导请参考添加安全组规则。 弹性IP地址 如果客户端使用公网连接Kafka实例,请提前创建弹性IP地址。 在创建弹性IP地址时,应注意如下要求: 创建的弹性IP地址与Kafka实例在相同的区域。 弹性IP地址的数量必须与Kafka实例的代理个数相同。 Kafka控制台无法识别开启IPv6转换功能的弹性IP地址。 创建弹性IP地址的操作指导请参考申请弹性公网IP。
  • 使用PEM格式证书访问Kafka实例 以下示例演示在Java客户端使用PEM格式证书访问Kafka实例。 参考Java客户端接入示例,连接Kafka实例生产消费消息。其中,生产消息配置文件和消费消息配置文件中的SASL信息修改为如下内容。 #SASL认证机制为“PLAIN”时,配置信息如下。 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; #SASL认证机制为“SCRAM-SHA-512”时,配置信息如下。 sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; #设置Kafka安全协议。 security.protocol=SASL_SSL #ssl truststore.location为SSL证书的存放路径,如下代码以Windows系统路径格式举例,您在使用时请根据实际运行环境修改路径格式。 ssl.truststore.location=E:\\temp\\client.pem #ssl.truststore.password为服务器证书密码,使用PEM格式证书访问Kafka实例,无需配置ssl.truststore.password参数。 #ssl.truststore.password=dms@kafka #ssl.endpoint.identification.algorithm为证书 域名 校验开关,为空则表示关闭,这里需要保持关闭状态,必须设置为空。 ssl.endpoint.identification.algorithm= #添加ssl.truststore.type参数,指定客户端使用的证书类型是PEM。 ssl.truststore.type=PEM
  • 变更实例规格的影响 变更实例规格的时长和代理数量有关,单个代理的变更时长一般在5~10分钟,代理数量越多,规格变更时间越长。 表2 变更实例规格的影响 变更配置类型 影响 基准带宽/代理数量 扩容基准带宽/代理数量不会影响原来的代理,业务也不受影响。 扩容基准带宽/代理数量时,系统会根据当前磁盘大小进行相应比例的存储空间扩容。例如扩容前实例的代理数为3,每个代理的磁盘大小为200GB,扩容后实例的代理数为10,此时代理的磁盘大小依旧为200GB,但是总磁盘大小为2000GB。 新创建的Topic才会分布在新代理上,原有Topic还分布在原有代理上,造成分区分布不均匀。通过修改分区平衡,实现将原有Topic分区的副本迁移到新代理上。 存储空间 扩容存储空间有次数限制,只能扩容20次。 扩容存储空间不会影响业务。 代理规格 若Topic为单副本,扩容/缩容期间无法对该Topic生产消息或消费消息,会造成业务中断。 若Topic为多副本,扩容/缩容代理规格不会造成服务中断,但可能会导致消费的分区消息发生乱序,请谨慎评估业务影响,建议您在业务低峰期扩容/缩容。 扩容/缩容代理规格的过程中,节点滚动重启造成分区Leader切换,会发生秒级连接闪断,在用户网络环境稳定的前提下,Leader切换时长一般为1分钟以内。多副本的Topic需要在生产客户端配置重试机制,方法如下: 生产客户端为Kafka开源客户端时,检查是否配置retries参数,建议此参数值设置为3~5。 生产客户端为Flink客户端时,检查是否配置重启策略,配置重启策略可以参考如下代码。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(20))); 若实例已创建的分区数总和大于待缩容规格的实例分区数上限,此时无法缩容。不同规格配置的实例分区数上限不同,具体请参见产品规格。 例如:kafka.4u8g.cluster*3的实例,已创建800个分区,您想把此实例的规格缩容为kafka.2u4g.cluster*3,kafka.2u4g.cluster*3规格的实例分区数上限为750,此时无法缩容。
  • 约束与限制 2023年4月25日及以后创建的实例,支持在控制台创建消费组。 如果“auto.create.groups.enable”设置为“true”,当消费组的状态为“EMPTY”且从未提交过offset,系统将在十分钟后自动删除该消费组。 如果“auto.create.groups.enable”设置为“false”,系统不会自动删除消费组。如果需要删除消费组,需要您手动删除。 如果消费组从未提交过offset,当Kafka实例重启后,该消费组会被删除。
  • 分区平衡前的准备工作 在不影响业务的前提下,适当调小Topic老化时间并等待消息老化,减少迁移数据,加快迁移速度,分区平衡任务结束后可重新调整为初始值。修改Topic老化时间的步骤,请参考修改Kafka消息老化时间。 确保分区平衡的目标Broker磁盘容量充足,在磁盘存储统计中查看每个Broker的可用磁盘容量。如果目标Broker磁盘剩余容量接近分区平衡迁移到该Broker上的数据量,为了给目标Broker上的消息生产预留存储空间,应先进行磁盘扩容,再进行分区平衡。
  • 操作影响 对数据量大的Topic进行分区平衡,会占用大量的网络和存储带宽,业务可能会出现请求超时或者时延增大,建议在业务低峰期时操作。对Topic进行分区平衡前,根据Kafka实例规格对比当前实例负载情况,评估是否可以进行分区平衡,建议预留足够的带宽进行分区平衡,CPU使用率在90%以上时,不建议进行分区平衡。Topic的数据量和CPU使用率可以通过监控页面的“队列数据容量”和“CPU使用率”查看,具体步骤请参考查看Kafka监控数据。 带宽限制是指设定Topic进行副本同步的带宽上限,确保不会对该实例上的其他Topic造成流量冲击。但需要注意,带宽限制不会区分是正常的生产消息造成的副本同步还是分区平衡造成的副本同步,如果带宽限制设定过小,可能会影响正常的生产消息,且可能会造成分区平衡一直无法结束。如果分区平衡一直无法结束,请联系客服处理。 分区平衡任务启动后,不能删除正在进行分区平衡的Topic,否则会导致分区平衡任务无法结束。 分区平衡任务启动后,无法修改Topic的分区数。 分区平衡任务启动后,无法手动停止任务,需要等到任务完成。 已设置了一个定时分区平衡任务,在此任务未执行前,无法对本实例内的任何Topic执行其他分区平衡任务。 分区平衡后Topic的metadata会改变,如果生产者不支持重试机制,会有少量的请求失败,导致部分消息生产失败。 数据量大的Topic进行分区平衡的时间会比较长,建议根据Topic的消费情况,适当调小Topic老化时间,使得Topic的部分历史数据被及时清理,加快迁移速度。Topic的数据量可以通过监控页面的“队列数据容量”查看,具体步骤请参考查看Kafka监控数据。
  • 修改定时分区平衡任务 在“后台任务管理”页面的“定时任务”页签中,单击页面左上角下拉框,选择时间段,在搜索对话框中输入待修改定时分区平衡任务的Topic名称,按“Enter”,实现快速查找定时分区平衡任务。 图5 查找定时分区平衡任务 在待修改的定时分区平衡任务后,单击“修改”。 在弹出的“修改定时任务”对话框中,您可以修改定时分区平衡任务的时间,还可以取消定时分区平衡任务,具体操作如下。 修改定时分区平衡任务的时间:修改时间,单击“确定”。 取消定时分区平衡任务:选择“取消”,如图6所示,单击“确定”。 图6 取消定时分区平衡任务
  • 带宽限制值估算方法 带宽限制值受分区平衡任务执行时间、分区副本Leader/Follower分布情况以及消息生产速率等因素影响,具体分析如下。 带宽限制值作用范围为整个Broker,对该Broker内所有副本同步的分区进行带宽限流。 带宽限制会将分区平衡后新增的副本视为Follower副本限流,分区平衡前的Leader副本视为Leader副本限流,Leader副本的限流和Follower副本限流分开计算。 带宽限制不会区分是正常的生产消息造成的副本同步还是分区平衡造成的副本同步,因此两者的流量都会被限流统计。 假设分区平衡任务需要在200s内完成,每个副本的数据量为100MB,在以下几种场景中,估算带宽限制值。 场景一:Topic1有2分区2副本,Topic2有1分区1副本,所有Leader副本在同一个Broker上,如表4所示,Topic1和Topic2分别需要新增1个副本,如表5所示。 表4 分区平衡前Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0,1 Topic1 1 0 0,2 Topic2 0 0 0 表5 分区平衡后Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0,1,2 Topic1 1 0 0,1,2 Topic2 0 0 0,2 图7 场景一分区平衡图 如图7所示,有3个副本需要从Broker 0拉取数据,Broker 0中每个副本的数据量为100MB,Broker 0中只有Leader副本,Broker 1和Broker 2中只有Follower副本,由此得出以下数据: Broker 0在200s内完成分区平衡所需的带宽限制值=(100+100+100)/200=1.5MB/s Broker 1在200s内完成分区平衡所需的带宽限制值=100/200=0.5MB/s Broker 2在200s内完成分区平衡所需的带宽限制值=(100+100)/200=1MB/s 综上所述,若想要在200s内完成分区平衡任务,带宽限制值应设为大于等于1.5MB/s。由于控制台的带宽限制只能是整数,因此带宽限制值应设为大于等于2MB/s。 场景二:Topic1有2分区1副本,Topic2有2分区1副本,Leader副本分布在不同Broker上,如表6所示,Topic1和Topic2分别需要新增1个副本,如表7所示。 表6 分区平衡前Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0 Topic1 1 1 1 Topic2 0 1 1 Topic2 1 2 2 表7 分区平衡后Topic分区的副本分布 Topic名称 分区名称 Leader副本所在Broker Follower副本所在Broker Topic1 0 0 0,2 Topic1 1 1 1,2 Topic2 0 1 1,2 Topic2 1 2 2,0 图8 场景二分区平衡图
  • 操作影响 当流控值达到上限后,会导致生产/消费的时延增大。 设置的流控值较小且生产者速率较大时,可能会造成生产超时、消息丢失,导致部分消息生产失败。 初始生产/消费的流量较大,如果设置一个较小的流控值,会导致生产/消费的时延增大、部分消息生产失败。建议逐次减半设置流控值,待生产/消费稳定后继续减半设置,直到设置为目标流控值。例如初始生产流量100MB/s,可先设置生产流控为50MB/s,待稳定后再修改为25MB/s,直到目标流控值。
  • 迁移方案二:同时消费,后迁生产 指消费者业务启用多个消费客户端,分别向原Kafka和新Kafka实例消费消息,然后将生产业务切到新Kafka实例,这样能确保所有消息都被及时消费。 本方案中消费业务会在一段时间内同时消费原Kafka和新Kafka实例。由于在迁移生产业务之前,已经有消费业务运行在新Kafka实例上,因此不会存在端到端时延的问题。但在迁移生产的开始阶段,同时消费原Kafka与新Kafka实例,会导致部分消息之间的生产顺序无法保证,存在消息乱序的问题。此场景适用于对端到端时延有要求,却对消息顺序不敏感的业务。 启动新的消费客户端,配置Kafka连接地址为新Kafka实例的连接地址,消费新Kafka实例中的数据。 原有消费客户端需继续运行,消费业务同时消费原Kafka与新Kafka实例的消息。 修改生产客户端,Kafka连接地址改为新Kafka实例的连接地址。 重启生产客户端,将生产业务迁移到新Kafka实例中。 生产业务迁移后,观察连接新Kafka实例的消费业务是否正常。 等待原Kafka中数据消费完毕,关闭原有消费业务客户端。 迁移结束。
  • 迁移方案一:先迁生产,再迁消费 指先将生产消息的业务迁移到新的Kafka,原Kafka不会有新的消息生产。待原有Kafka实例的消息全部消费完成后,再将消费消息业务迁移到新的Kafka,开始消费新Kafka实例的消息。 本方案为业界通用的迁移方案,操作步骤简单,迁移过程由业务侧自主控制,整个过程中消息不会存在乱序问题,适用于对消息顺序有要求的场景。但是该方案中需要等待消费者业务直至消费完毕,存在一个时间差的问题,部分数据可能存在较大的端到端时延。 将生产客户端的Kafka连接地址修改为新Kafka实例的连接地址。 重启生产业务,使得生产者将新的消息发送到新Kafka实例中。 观察各消费组在原Kafka的消费进度,直到原Kafka中数据都已经被消费完毕。 将消费客户端的Kafka连接地址修改为新Kafka实例的连接地址。 重启消费业务,使得消费者从新Kafka实例中消费消息。 观察消费者是否能正常从新Kafka实例中获取数据。 迁移结束。
  • 迁移准备 配置网络环境。 Kafka实例分内网地址以及公网地址两种网络连接方式。如果使用公网地址,则消息生成与消费客户端需要有公网访问权限,并配置如下安全组。 表1 安全组规则 方向 协议 端口 源地址 说明 入方向 TCP 9094 Kafka客户端所在的IP地址或地址组 通过公网访问Kafka(明文接入)。 入方向 TCP 9095 Kafka客户端所在的IP地址或地址组 通过公网访问Kafka(密文接入)。 创建目标Kafka实例。 目标Kafka的规格不能低于原业务使用的Kafka规格。具体请参考购买Kafka实例。 在目标Kafka实例中创建Topic。 在目标Kafka实例上创建与原Kafka实例相同配置的Topic,包括Topic名称、副本数、分区数、消息老化时间,以及是否同步复制和落盘等。具体请参考创建Kafka Topic。
  • 迁移方案三:先迁消费,再迁生产 指首先通过Smart Connect同步两个Kafka的消息,其次将消费端迁移到新Kafka,最后将生产端迁移到新Kafka。 本方案依赖于Smart Connect,Smart Connect实时同步源端和目标端的数据,但是消费进度是通过批处理同步的,可能会导致源端和目标端每个分区的消费进度存在0-100之间的差异,存在少量重复消费问题。此场景适用于生产端不可停止,端到端有时延要求,但是可以兼容少量重复消费的业务。 创建Kafka数据复制的Smart Connect任务,用于同步两个Kafka的消息。具体步骤请参见配置Kafka间的数据复制。 在Kafka控制台的“消息查询”页面,查看两个Kafka的最新消息是否一致,确认两个Kafka的同步进度是否一致。具体步骤请参见查看Kafka消息。 是,执行3。 否,在监控页面查看两个Kafka的“Kafka每分钟同步数据量”是否正常,如果正常,先等待两个Kafka的同步进度一致,然后执行3。 将消费客户端的Kafka连接地址修改为新Kafka实例的连接地址。 重启消费业务,使得消费者从新Kafka实例中消费消息。 观察消费者是否能正常从新Kafka实例中获取数据。 修改生产客户端,Kafka连接地址改为新Kafka实例的连接地址。 重启生产客户端,将生产业务迁移到新Kafka实例中。 生产业务迁移后,观察连接新Kafka实例的消费业务是否正常。 迁移结束。
  • 步骤三:修改客户端配置文件 替换证书后,需要在客户端的“consumer.properties”和“producer.properties”文件中,分别修改“ssl.truststore.location”和“ssl.truststore.password”参数。 security.protocol=SASL_SSL ssl.truststore.location=/opt/kafka_2.12-2.7.2/config/client.truststore.jks ssl.truststore.password=dms@kafka ssl.endpoint.identification.algorithm= ssl.truststore.location配置为client.truststore.jks证书的存放路径。 ssl.truststore.password为客户端证书的Truststore密码。 ssl.endpoint.identification.algorithm为证书域名校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
  • 操作影响 对数据量大的Topic进行分区平衡,会占用大量的网络和存储带宽,业务可能会出现请求超时或者时延增大,建议在业务低峰期时操作。对Topic进行分区平衡前,根据Kafka实例规格对比当前实例负载情况,评估是否可以进行分区平衡,建议预留足够的带宽进行分区平衡,CPU使用率在90%以上时,不建议进行分区平衡。Topic的数据量和CPU使用率可以通过监控页面的“队列数据容量”和“CPU使用率”查看,具体步骤请参考查看Kafka监控数据。 带宽限制是指设定Topic进行副本同步的带宽上限,确保不会对该实例上的其他Topic造成流量冲击。但需要注意,带宽限制不会区分是正常的生产消息造成的副本同步还是分区平衡造成的副本同步,如果带宽限制设定过小,可能会影响正常的生产消息,且可能会造成分区平衡一直无法结束。如果分区平衡一直无法结束,请联系客服处理。 分区平衡任务启动后,不能删除正在进行分区平衡的Topic,否则会导致分区平衡任务无法结束。 分区平衡任务启动后,无法修改Topic的分区数。 分区平衡任务启动后,无法手动停止任务,需要等到任务完成。 已设置了一个定时分区平衡任务,在此任务未执行前,无法对本实例内的任何Topic执行其他分区平衡任务。 分区平衡后Topic的metadata会改变,如果生产者不支持重试机制,会有少量的请求失败,导致部分消息生产失败。 数据量大的Topic进行分区平衡的时间会比较长,建议根据Topic的消费情况,适当调小Topic老化时间,使得Topic的部分历史数据被及时清理,加快迁移速度。Topic的数据量可以通过监控页面的“队列数据容量”查看,具体步骤请参考查看Kafka监控数据。
  • 分区平衡前的准备工作 在不影响业务的前提下,适当调小Topic老化时间并等待消息老化,减少迁移数据,加快迁移速度,分区平衡任务结束后可重新调整为初始值。修改Topic老化时间的步骤,请参考修改Kafka消息老化时间。 确保分区平衡的目标Broker磁盘容量充足,在磁盘存储统计中查看每个Broker的可用磁盘容量。如果目标Broker磁盘剩余容量接近分区平衡迁移到该Broker上的数据量,为了给目标Broker上的消息生产预留存储空间,应先进行磁盘扩容,再进行分区平衡。
  • 步骤五:验证接口连通性 参考使用客户端连接Kafka(关闭SASL)或者使用客户端连接Kafka(开启SASL),测试是否可以生产和消费消息。 测试接口连通性时,注意以下几点: 连接Kafka实例的地址为“advertised.listeners IP:9011”,以图8为例,连接Kafka实例的地址为“192.168.0.71:9011,192.168.0.11:9011,192.168.0.21:9011”。 在Kafka实例安全组的入方向规则中放通9011端口,以及198.19.128.0/17网段的地址。 如果Kafka实例的子网配置了网络ACL功能,需要在网络ACL的入方向规则中放通198.19.128.0/17网段的地址,以及 VPC终端节点 涉及的子网。 198.19.128.0/17是为VPC终端节点分配的网段,使用VPC终端节点需要放通此网段。
  • 客户端使用VPCEP跨VPC访问Kafka时,使用的是明文连接还是密文连接? 使用明文连接还是密文连接,取决于“跨VPC访问协议”。跨VPC访问协议是在创建Kafka实例时设置的,实例创建成功后,不支持修改。 跨VPC访问协议的取值如下: PLAINTEXT:表示客户端访问Kafka实例时,无需认证,数据通过明文传输。 SASL_SSL:表示客户端访问Kafka实例时,使用SASL认证,数据通过SSL证书加密传输。 SASL_PLAINTEXT:表示客户端访问Kafka实例时,使用SASL认证,数据通过明文传输。
  • 约束与限制 若“auto.create.groups.enable”设置为“true”,当消费组的状态为“EMPTY”且从未提交过offset,系统将在十分钟后自动删除该消费组。 若“auto.create.groups.enable”设置为“false”,系统不会自动删除消费组。如果需要删除消费组,需要您手动删除。 若消费组从未提交过offset,当Kafka实例重启后,该消费组会被删除。 删除消费组会导致消费进度丢失,造成重新消费或重复消费等情况。
  • 查看消费者连接地址(Kafka Manager) 登录Kafka Manager。 单击“kafka_cluster”,进入集群详情页。 在顶部导航栏单击“Consumers”,进入消费组列表页面。 图2 导航栏 单击待查看消费者连接地址的消费组名称,进入消费组订阅的Topic列表页面。 图3 消费组列表页面 单击待查看消费者连接地址的Topic名称,进入Topic详情页。 图4 消费组订阅的Topic列表页面 在“Consumer Instance Owner”中,查看消费者连接地址。 图5 Topic详情页
  • 查看Kafka Manager 在进入Kafka Manager集群管理页面后,您可以查看Kafka集群的监控、代理等信息。 集群信息页 单击Clusters中的集群列表,即可进入集群信息页。如图3所示。 图中①区域表示功能导航栏。 Cluster:集群,统计集群列表和集群详情。 Brokers:代理,统计当前集群中各代理的状态信息。 Topic:队列,统计当前集群中的kafka队列。 Preferred Replica Election:强制进行一次队列leader的最优选举(不建议用户操作)。 Reassign Partitions:进行分区副本的重分配(不建议用户操作)。 Consumers:统计集群中的消费组状态。 图中②区域表示集群信息统计,包含集群的Topic数和集群的代理数。 图3 集群信息页 集群所有代理统计页 单击功能导航栏中的Brokers,即可进入代理统计页。如图4所示。 图中①区域代理列表,包含总的字节流入和字节流出。 图中②集群监控信息。 图4 所有代理统计页 具体代理统计页 单击id列表中具体的Broker,即可查看对应代理的统计信息。如图5所示。 图中①区域表示对应代理总的统计信息,包括队列数、分区数、分区leader数、消息速率占比、写入字节占比以及流出字节占比。 图中②区域表示代理监控信息。 图5 具体Broker信息 查看实例的Topic 在导航栏选择Topic,并在下拉列表中选择List。页面如图6所示,展示了队列列表以及分区数等。 列表中以“__”开头的队列为内部队列,严禁操作,否则可能导致业务问题。 图6 查看实例的Topic 队列详情页 单击具体的Topic名称,进入如图7所示页面。 图中①区域表示队列基本信息,包括副本数(Replication),分区数(Number of Partitions),消息数(Sum of partition offsets)等。 图中②区域表示代理与队列分区的对应关系。 图中③区域表示该队列的消费组列表。单击消费组名称可进入该消费组的详情页。 图中④区域表示队列的配置信息。详情参考Kafka队列官方配置文档(https://kafka.apache.org/documentation/#topicconfigs)。 图中⑤区域表示队列监控数据统计。 图中⑥区域表示队列分区信息,包括分区消息数(Latest Offset),分区leader(Leader),副本列表(Replicas),同步副本列表(In Sync Replicas)。 图7 队列详情页 查看消费组列表 导航栏中单击Consumers,即可查看当前集群中的消费组列表。 只显示14天内有消费记录的消费组。 图8 集群的消费组列表 查看消费组详情页 单击消费组名称可进入消费组详情页面,展示消费组消费的所有队列列表以及每个队列的可消费数(Total Lag)。 图9 消费组详情页面 查看消费组队列详情页 单击队列名称,即可进入详情页面,查看消费组消费在队列中每个分区的消费状态。包括分区编号(Partition)、分区消息数(LogSize)、分区消费进度(Consumer Offset)、分区剩余可消费数(Lag)和最近消费该分区的消费者(Consumer Instance Owner)。 图10 消费组队列详情页面
  • 登录Kafka Manager 创建一台与Kafka实例相同VPC和相同安全组的Windows服务器,详细步骤请参考购买并登录Windows弹性云服务器。 如果是已经开启了公网访问,该步骤为可选,在本地浏览器中即可访问,不需要单独的Windows弹性云服务器。 在实例详情信息页面,获取Kafka Manager地址。 未开启公网访问时,Kafka Manager地址为“Manager内网访问地址”。 图1 Kafka Manager内网访问地址 已开启公网访问时,Kafka Manager地址为“Manager公网访问地址”。 图2 Kafka Manager公网访问地址 在浏览器中输入Kafka Manager的地址,进入Kafka Manager登录页面。 如果是开启了公网访问,在本地浏览器输入Kafka Manager地址访问;如果没有开启公网访问,需要登录1的弹性云服务器,然后在浏览器输入Kafka Manager地址访问。 输入创建实例时设置的Kafka Manager用户名和密码,即可管理Kafka集群。
  • 测试脚本 ./kafka-producer-perf-test.sh --producer-props bootstrap.servers=${连接地址} acks=1 batch.size=16384 linger.ms=10 --topic ${Topic名称} --num-records 10000000 --record-size 1024 --throughput -1 --producer.config ../config/producer.properties bootstrap.servers:购买Kafka实例后,获取的Kafka实例的地址。 acks:消息主从同步策略,acks=1表示异步复制消息,acks=-1表示同步复制消息。 batch.size:每次批量发送消息的大小(单位为字节)。 linger.ms:两次发送时间间隔。 topic:创建Topic中设置的Topic名称。 num-records:总共需要发送的消息数。 record-size:每条消息的大小。 throughput:每秒发送的消息数。
  • 测试结果 测试场景一(实例是否开启SASL):相同的Topic(30分区、3副本、异步复制、异步落盘),实例分为开启SASL和未开启SASL,测试结果如下: 表3 测试结果 实例规格 磁盘类型 代理数量 TPS(开启SASL) TPS(未开启SASL) kafka.2u4g.cluster 超高I/O 3 100000 280000 kafka.4u8g.cluster 超高I/O 3 170000 496000 kafka.8u16g.cluster 超高I/O 3 200000‬ 730000 kafka.12u24g.cluster 超高I/O 3 320000 790000 kafka.16u32g.cluster 超高I/O 3 360000 1000000 结论:在Topic相同的情况下,生产消息到规格相同、接入方式不同的Kafka实例,未开启SASL的实例TPS高于开启SASL的实例TPS。 测试场景二(同步/异步复制):相同的实例(超高I/O、3个代理、未开启SASL),不同复制机制的Topic,生产者进程数为3时,测试结果如下: 表4 测试结果 实例规格 是否同步落盘 副本数 分区数 TPS(同步复制) TPS(异步复制) kafka.2u4g.cluster 否 3 30 100000 280000 kafka.4u8g.cluster 否 3 30 230000 496000 kafka.8u16g.cluster 否 3 30 342000 730000 kafka.12u24g.cluster 否 3 30 383000 790000 kafka.16u32g.cluster 否 3 30 485000 1000000 结论:生产消息到同一个Kafka实例的不同Topic中,Topic除了复制机制,其他参数相同,异步复制Topic的TPS高于同步复制Topic的TPS。 测试场景三(是否同步落盘):相同的实例(超高I/O、3个代理、未开启SASL),不同落盘机制的Topic,测试结果如下: 表5 测试结果 实例规格 是否同步复制 副本数 分区数 TPS(同步落盘) TPS(异步落盘) kafka.2u4g.cluster 否 3 30 30000 280000 kafka.4u8g.cluster 否 3 30 32500 496000 kafka.8u16g.cluster 否 3 30 36100 730000 kafka.12u24g.cluster 否 3 30 37400 790000 kafka.16u32g.cluster 否 3 30 40400 1000000 结论:生产消息到同一个Kafka实例的不同Topic中,Topic除了落盘机制,其他参数相同,异步落盘Topic的TPS远远高于同步落盘Topic的TPS。 测试场景四(不同磁盘类型):相同的Topic(30分区、3副本、异步复制、异步落盘),不同磁盘类型的实例,测试结果如下: 表6 测试结果 实例规格 代理数量 是否开启SASL TPS(高I/O) TPS(超高I/O) kafka.2u4g.cluster 3 否 110000 250000 kafka.4u8g.cluster 3 否 135000 380000 kafka.8u16g.cluster 3 否 213000 480000 kafka.12u24g.cluster 3 否 240000 577000 kafka.16u32g.cluster 3 否 280000 840000 结论:在Topic相同的情况下,生产消息到规格相同、磁盘类型不同的Kafka实例,超高I/O的实例TPS高于高I/O的实例TPS。 测试场景五(不同分区数):相同的实例(超高I/O、3个代理、未开启SASL),不同分区数的Topic,测试结果如下: 表7 测试结果 实例规格 是否同步落盘 是否同步复制 副本数 TPS(3分区) TPS(12分区) TPS(100分区) kafka.2u4g.cluster 否 否 3 250000 260000 250000 kafka.4u8g.cluster 否 否 3 330000 280000 260000 kafka.8u16g.cluster 否 否 3 480000 410000 340000 kafka.12u24g.cluster 否 否 3 570000 750000 520000 kafka.16u32g.cluster 否 否 3 840000 1000000 630000 结论:生产消息到同一个Kafka实例的不同Topic中,Topic除了分区数量,其他参数相同。随着分区数的增加,Kafka的性能通常会随之增加,当分区数达到一定程度后,继续增加分区数可能会导致性能下降。
共100000条