云服务器内容精选

  • 方案概述 Kafka将Topic划分为多个分区,消息被分布式存储在分区中。同一个消费组内,一个消费者可同时消费多个分区,但一个分区在同一时刻只能被一个消费者消费。 在消息处理过程中,如果客户端的消费速度跟不上服务端的发送速度,未处理的消息会越来越多,这部分消息就被称为堆积消息。消息没有被及时消费就会产生消息堆积,从而会造成消息消费延迟。 消息堆积原因 导致消息堆积的常见原因如下: 生产者短时间内生产大量消息到Topic,消费者无法及时消费。 消费者的消费能力不足(消费者并发低、消息处理时间长),导致消费效率低于生产效率。 消费者异常(如消费者故障、消费者网络异常等)导致无法消费消息。 Topic分区设置不合理,或新增分区无消费者消费。 Topic频繁重平衡导致消费效率降低。
  • 方案概述 Kafka将Topic划分为多个分区,所有消息分布式存储在各个分区上。每个分区有一个或多个副本,分布在不同的Broker节点上,每个副本存储一份全量数据,副本之间的消息数据保持同步。Kafka的Topic、分区、副本和代理的关系如下图所示: 在实际业务过程中可能会遇到各节点间或分区之间业务数据不均衡的情况,业务数据不均衡会降低Kafka集群的性能,降低资源使用率。 业务数据不均衡原因 业务中部分Topic的流量远大于其他Topic,会导致节点间的数据不均衡。 生产者发送消息时指定了分区,未指定的分区没有消息,会导致分区间的数据不均衡。 生产者发送消息时指定了消息Key,按照对应的Key发送消息至对应的分区,会导致分区间的数据不均衡。 系统重新实现了分区分配策略,但策略逻辑有问题,会导致分区间的数据不均衡。 Kafka扩容了Broker节点,新增的节点没有分配分区,会导致节点间的数据不均衡。 业务使用过程中随着集群状态的变化,多少会发生一些Leader副本的切换或迁移,会导致个别Broker节点上的数据更多,从而导致节点间的数据不均衡。
  • 实施步骤 业务数据不均衡的处理措施: 优化业务中Topic的设计,对于数据量特别大的Topic,可对业务数据做进一步的细分,并分配到不同的Topic上。 生产者生产消息时,尽量把消息均衡发送到不同的分区上,确保分区间的数据均衡。 创建Topic时,使分区的Leader副本分散到各个Broker节点中,以保障整体的数据均衡。 Kafka提供了分区重平衡的功能,可以把分区的副本重新分配到不同的Broker节点上,解决节点间负载不均衡的问题。具体分区重平衡的操作请参考修改分区平衡。
  • 步骤三:创建Topic 在“Kafka专享版”页面,单击Kafka实例的名称,进入实例详情页面。 在左侧导航栏单击“Topic管理”,进入Topic列表页。 单击“创建Topic”,弹出“创建Topic”对话框。 填写Topic名称和配置信息,配置详情请参考表7,单击“确定”,完成创建Topic。 表7 Topic参数说明 参数 说明 Topic名称 名称支持自定义,但需要符合命名规则:以英文字母、数字、下划线开头,且只能由英文字母、数字、句点、中划线、下划线组成,长度为3~200个字符。 名称不能为以下内置Topic: __consumer_offsets __transaction_state __trace __connect-status __connect-configs __connect-offsets 创建Topic后不能修改名称。 输入“topic-01”。 分区数 如果分区数与消费者数一致,分区数越大消费的并发度越大。 输入“3”。 副本数 Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。 输入“3”。 老化时间(小时) 消息的最长保留时间,消费者必须在此时间结束前消费消息,否则消息将被删除。删除的消息,无法被消费。 输入“72”。 同步复制 选择“不开启”,即Leader副本接收到消息并成功写入到本地日志后,就马上向客户端发送写入成功的消息,不需要等待所有Follower副本复制完成。 同步落盘 选择“不开启”,即生产的消息存在内存中,不会立即写入磁盘。 消息时间戳类型 选择“CreateTime”,即生产者创建消息的时间。 批处理消息最大值(字节) Kafka允许的最大批处理大小,如果在生产客户端配置文件或代码中启用消息压缩,则表示压缩后的最大批处理大小。 输入“10485760”。 描述 不设置。 图8 创建Topic
  • 步骤三:修改客户端配置文件 替换证书后,需要在客户端的“consumer.properties”和“producer.properties”文件中,分别修改“ssl.truststore.location”和“ssl.truststore.password”参数。 security.protocol=SASL_SSLssl.truststore.location=/opt/kafka_2.12-2.7.2/config/client.truststore.jksssl.truststore.password=dms@kafkassl.endpoint.identification.algorithm= ssl.truststore.location配置为client.truststore.jks证书的存放路径。 ssl.truststore.password为客户端证书的Truststore密码。 ssl.endpoint.identification.algorithm为证书 域名 校验开关,为空则表示关闭。这里需要保持关闭状态,必须设置为空。
  • 步骤五:验证接口连通性 参考使用客户端连接Kafka(关闭SASL)或者使用客户端连接Kafka(开启SASL),测试是否可以生产和消费消息。 测试接口连通性时,注意以下几点: 连接Kafka实例的地址为“advertised.listeners IP:9011”,以图6为例,连接Kafka实例的地址为“124.xxx.xxx.167:9011,124.xxx.xxx.174:9011,124.xxx.xxx.57:9011”。 在Kafka实例安全组的入方向规则中放通9011端口。 连接Kafka实例的客户端已开启公网访问功能。
  • 步骤三:添加DNAT规则 在“公网NAT网关”页面,在新购买的公网NAT网关后,单击“设置规则”,进入公网NAT网关详情页。 在“DNAT规则”页签,单击“添加DNAT规则”,弹出“添加DNAT规则”对话框。 图3 公网NAT网关详情页 设置如下参数。 使用场景:选择“虚拟私有云” 端口类型:选择“具体端口” 支持协议:选择“TCP” 公网IP类型:选择“弹性公网IP”,在下拉框中选择已购买的弹性公网IP 公网端口:输入“9011” 实例类型:选择“自定义” 私网IP:输入获取Kafka实例的信息中记录的Kafka实例的一个内网连接地址 私网端口:输入“9011” 如果想要了解更多的参数信息,请参考添加DNAT规则。 图4 添加DNAT规则 单击“确定”,完成DNAT规则的添加。 DNAT规则添加成功后,在DNAT规则列表中查看此规则的状态,若“状态”为“运行中”,表示创建成功。 为获取Kafka实例的信息中记录的其他内网连接地址创建DNAT规则,每个DNAT规则需要设置不同的弹性公网IP。 创建DNAT规则的具体步骤参考2~4。 DNAT规则全部创建成功后,在“DNAT规则”页签,查看已创建的DNAT规则,并记录私网IP对应的弹性公网IP。 图5 DNAT规则列表
  • DMS for Kafka自定义策略样例 如果系统预置的DMS for Kafka权限,不满足您的授权要求,可以创建自定义策略。自定义策略中可以添加的授权项(Action)请参考细粒度策略支持的授权项。 目前华为云支持以下两种方式创建自定义策略: 可视化视图创建自定义策略:无需了解策略语法,按可视化视图导航栏选择云服务、操作、资源、条件等策略内容,可自动生成策略。 JSON视图创建自定义策略:可以在选择策略模板后,根据具体需求编辑策略内容;也可以直接在编辑框内编写JSON格式的策略内容。 具体创建步骤请参见:创建自定义策略。下面为您介绍常用的DMS for Kafka自定义策略样例。 示例1:授权用户删除实例和重启实例 { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "dms:instance:modifyStatus", "dms:instance:delete" ] } ]} 示例2:拒绝用户删除实例 拒绝策略需要同时配合其他策略使用,否则没有实际作用。用户被授予的策略中,一个授权项的作用如果同时存在Allow和Deny,则遵循Deny优先原则。 如果您给用户授予DMS FullAccess的系统策略,但不希望用户拥有DMS FullAccess中定义的删除实例权限,您可以创建一条拒绝删除实例的自定义策略,然后同时将DMS FullAccess和拒绝策略授予用户,根据Deny优先原则,则用户可以对DMS for Kafka执行除了删除实例外的所有操作。拒绝策略示例如下: { "Version": "1.1", "Statement": [ { "Effect": "Deny", "Action": [ "dms:instance:delete" ] } ]}
  • producer使用建议 同步复制客户端需要配合使用:acks=all 配置发送失败重试:retries=3 发送优化:对于时延敏感的信息,设置linger.ms=0。对于时延不敏感的信息,设置linger.ms在100~1000之间。 生产端的JVM内存要足够,避免内存不足导致发送阻塞。 时间戳设置为与当地时间一致,避免时间戳为未来时间导致消息无法老化。 尽量复用producer,不要频繁创建producer。当producer开启幂等时(生产者客户端3.0及之后的版本默认开启幂等),生产消息会在服务端创建生产者状态对象,若频繁创建producer,会导致服务端创建大量生产者状态对象后无法及时回收,服务端内存占用飙升,进而导致服务端性能急剧下降。如果不需要使用幂等功能,请将“enable.idempotence”设置为“false”。
  • 前提条件 给用户组授权之前,请您了解用户组可以添加的DMS for Kafka系统策略,并结合实际需求进行选择,DMS for Kafka支持的系统策略及策略间的对比,请参见:DMS for Kafka系统策略。若您需要对除DMS for Kafka之外的其它服务授权, IAM 支持服务的所有策略请参见权限策略。 DMS for Kafka的权限与策略基于分布式消息服务DMS,因此在IAM服务中为用户组授予DMS for Kafka权限时,请选择并使用“DMS”的权限与策略。
  • 示例流程 图1 给用户授权DMS for Kafka权限流程 创建用户组并授权 在IAM控制台创建用户组,并授予DMS for Kafka的只读权限“DMS ReadOnlyAccess”。 创建用户并加入用户组 在IAM控制台创建用户,并将其加入1中创建的用户组。 用户登录并验证权限 新创建的用户登录控制台,切换至授权区域,验证权限: 在“服务列表”中选择分布式消息服务Kafka版,进入Kafka实例主界面,单击右上角“购买Kafka实例”,尝试购买Kafka实例,如果无法购买Kafka实例(假设当前权限仅包含DMS ReadOnlyAccess),表示“DMS ReadOnlyAccess”已生效。 在“服务列表”中选择云硬盘(假设当前策略仅包含DMS ReadOnlyAccess),若提示权限不足,表示“DMS ReadOnlyAccess”已生效。 在“服务列表”中选择分布式消息服务Kafka版(假设当前策略仅包含DMS ReadOnlyAccess),进入Kafka实例主界面,如果能够查看Kafka实例列表,表示“DMS ReadOnlyAccess”已生效。
  • DMS for Kafka请求条件 您可以在创建自定义策略时,通过添加“请求条件”(Condition元素)来控制策略何时生效。请求条件包括条件键和运算符,条件键表示策略语句的 Condition元素,分为全局级条件键和服务级条件键。全局级条件键(前缀为g:)适用于所有操作,服务级条件键(前缀为服务缩写,如dms:)仅适用于对应服务的操作。运算符与条件键一起使用,构成完整的条件判断语句。 DMS for Kafka通过IAM预置了一组条件键,例如,您可以先使用dms:ssl条件键检查Kafka实例是否开启SASL,然后再允许执行操作。下表显示了适用于DMS for Kafka服务特定的条件键。 表2 DMS for Kafka请求条件 DMS for Kafka条件键 运算符 描述 dms:connector Bool Null 是否开启Smart Connect dms:publicIP Bool Null 是否开启公网 dms:ssl Bool Null 是否开启SSL
  • DMS for Kafka资源 资源是服务中存在的对象。在DMS for Kafka中,资源包括:kafka,您可以在创建自定义策略时,通过指定资源路径来选择特定资源。 表1 DMS for Kafka的指定资源与对应路径 指定资源 资源名称 资源路径 kafka 实例 【格式】 DMS:*:*:kafka:实例ID 【说明】 对于实例资源,IAM自动生成资源路径前缀DMS:*:*:kafka: 通过实例ID指定具体的资源路径,支持通配符*。例如: DMS:*:*:kafka:*表示任意Kafka实例。
  • consumer使用建议 consumer的owner线程需确保不会异常退出,避免客户端无法发起消费请求,阻塞消费。 确保处理完消息后再做消息commit,避免业务消息处理失败,无法重新拉取处理失败的消息。 通常不建议对每条消息都进行commit,如果对每条消息都进行了commit,会导致OFFSET_COMMIT请求过多,进而导致CPU使用率过高。例如:如果一个消费请求拉取1000条消息,每条都commit,则commit请求TPS是消费的1000倍,消息体越小,这个比例越大。建议隔一定条数或时间,批量commit,或打开enable.auto.commit,这样设置会存在一个缺点,即在客户端故障时,可能丢失一部分缓存的消费进度,导致重复消费。请根据业务实际情况,设置批量commit。 consumer不能频繁加入和退出group,频繁加入和退出,会导致consumer频繁做rebalance,阻塞消费。 同一消费组内consumer数量不能超过该消费组订阅的分区总数,否则会有consumer拉取不到消息。 consumer需周期poll,维持和server的心跳,避免心跳超时,导致consumer频繁加入和退出,阻塞消费。 consumer拉取的消息本地缓存应有大小限制,避免OOM(Out of Memory)。 consumer session设置为30秒,session.timeout.ms=30000。 Kafka不能保证消费重复的消息,业务侧需保证消息处理的幂等性。 消费线程退出要调用consumer的close方法,避免同一个组的其他消费者阻塞session.timeout.ms的时间。 消费组名称开头不使用特殊字符(如#),使用特殊字符可能会导致 云监控 无法展示此消费组的监控数据。
  • 迁移方案一:先迁生产,再迁消费 指先将生产消息的业务迁移到新的Kafka,原Kafka不会有新的消息生产。待原有Kafka实例的消息全部消费完成后,再将消费消息业务迁移到新的Kafka,开始消费新Kafka实例的消息。 本方案为业界通用的迁移方案,操作步骤简单,迁移过程由业务侧自主控制,整个过程中消息不会存在乱序问题,适用于对消息顺序有要求的场景。但是该方案中需要等待消费者业务直至消费完毕,存在一个时间差的问题,部分数据可能存在较大的端到端时延。 将生产客户端的Kafka连接地址修改为新Kafka实例的连接地址。 重启生产业务,使得生产者将新的消息发送到新Kafka实例中。 观察各消费组在原Kafka的消费进度,直到原Kafka中数据都已经被消费完毕。 将消费客户端的Kafka连接地址修改为新Kafka实例的连接地址。 重启消费业务,使得消费者从新Kafka实例中消费消息。 观察消费者是否能正常从新Kafka实例中获取数据。 迁移结束。