检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
在分布式消息服务Kafka版提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 如图1所示,Top
Kafka实例Topic数量计算 Kafka实例对Topic分区数之和设置了上限,当达到上限之后,用户无法继续创建Topic。 所以,Topic数量和实例分区数上限、每个Topic的分区数有关,其中,每个Topic分区数可在创建Topic时设置,如图1,实例分区数上限参考表1。 图1 Topic的分区数
(可选)修改客户端配置文件。 在Kafka控制台的“基本信息 > 连接信息”中查看Kafka安全协议,两种安全协议对应的配置文件设置有所不同,请根据实际情况配置。 SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config
密文接入时设置的用户名与密码,或者创建用户时设置的用户名和密码。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。 context.load_verify_locations:证书文件。如果Kafka安全协议设置为“SASL_SSL”,需要设置此参数。使用
backoff.ms=1000。 request.timeout.ms 30000 结合实际业务调整 设置一个请求最大等待时间(单位为ms),超过这个时间则会抛Timeout异常。 超时时间如果设置大一些,如127000(127秒),高并发的场景中,能减少发送失败的情况。 block.on
fetchers)过多,导致CPU繁忙。 分区设置不合理,所有的生产和消费都集中在某个节点上,导致CPU利用率高。 磁盘写满的原因 业务数据增长较快,已有的磁盘空间不能满足业务数据需要。 节点内磁盘使用率不均衡,生产的消息集中在某个分区,导致分区所在的磁盘写满。 Topic的数据老化时间设置过大,保存了过多的历史数据,容易导致磁盘写满。
并发度越大。 该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。 取值范围:1~200 副本数 您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。 该参数设置为1时,表示只有一份数据。
进入“费用中心 > 续费管理”页面。 自定义查询条件。 可在“到期转按需项”页签查询已经设置到期转按需的资源。 可对“手动续费项”、“自动续费项”、“到期不续费项”页签的资源设置到期转按需的操作。 设置包年/包月资源到期后转按需。 单个资源到期转按需:选择需要更改计费模式的Kafka实例,单击操作列“更多
单击Kafka实例的名称,进入实例详情页面。 在左侧导航栏单击“流控管理 > 流控监控”,进入流控监控页面。 设置查询带宽使用量的参数。 表1 查询带宽使用量的参数说明 参数 说明 统计方式 设置带宽使用量的统计方式。 前n个:统计带宽使用量排名前x个的用户/客户端/Topic,x由您自行输入。
图1 我的配额 您可以在“服务配额”页面,查看各项资源的总配额及使用情况。 如果当前配额不能满足业务要求,请参考后续操作,申请扩大配额。 如何申请扩大配额? 登录管理控制台。 在页面右上角,选择“资源 > 我的配额”。 系统进入“服务配额”页面。 单击“申请扩大配额”。 在“新建
配置发送失败重试:retries=3 发送优化:对于时延敏感的信息,设置linger.ms=0。对于时延不敏感的信息,设置linger.ms在100~1000之间。 生产端的JVM内存要足够,避免内存不足导致发送阻塞。 时间戳设置为与当地时间一致,避免时间戳为未来时间导致消息无法老化。 尽
在告警规则页面,设置告警信息。 创建告警规则操作,请查看创建告警规则。 设置告警名称和告警的描述。 设置告警策略。 如下图所示,在进行指标监控时,如果连续1次,磁盘容量使用率原始值>=80%,则产生告警,如果未及时处理,则产生告警通知。 图1 设置告警策略 设置“发送通知”开关。
(可选)修改客户端配置文件。 在Kafka控制台的“基本信息 > 连接信息”中查看Kafka安全协议,两种安全协议对应的配置文件设置有所不同,请根据实际情况配置。 SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config
单击Kafka实例名称,进入实例详情页面。 在左侧导航栏选择“消息查询”,进入消息列表页面。 参考表1,设置查询参数。 表1 查询消息参数说明 参数名称 说明 Topic名称 选择待查询消息的Topic名称。 分区 选择消息所在的分区。 如果未设置具体分区,查询结果显示Topic所有分区的消息。 查询方式 查询方式支持以下两种方式:
如果Topic已经设置了老化时间,此时“配置参数”中的log.retention.hours值将不对此Topic生效。仅在Topic中未设置老化时间时,“配置参数”中的log.retention.hours值才会对此Topic生效。例如:Topic01设置的老化时间为60小时,“配置参数”中的log
partition Integer Topic分区数,设置消费的并发数。 retention_time Integer 消息老化时间。 sync_replication Boolean 是否开启同步复制,开启后,客户端生产消息时相应的也要设置acks=-1,否则不生效。 默认关闭。 sync_message_flush
L证书”对话框。 图2 连接信息 参考表1,设置替换SSL证书的参数。 图3 替换SSL证书 表1 替换SSL证书参数说明 参数名称 说明 Key密码 输入制作证书中设置的keystore密码 Keystore密码 输入制作证书中设置的keystore密码 Keystore文件 导入“server
\src\main\java\com\dms\consumer 消费消息的API。 DmsProducer.java .\src\main\java\com\dms\producer 生产消息的API。 dms.sdk.consumer.properties .\src\main\resources
消费消息。 密文接入信息 如果实例开启密文接入,则需要获得连接实例的用户名与密码、SASL认证机制和Kafka安全协议。Kafka安全协议设置为“SASL_SSL”时,还需要获取SSL证书。 连接实例的用户名在Kafka实例控制台的“用户管理”页面中查看,如果忘记密码,可通过重置密码重新获得。
重启生产客户端,将生产业务迁移到新Kafka实例中。 生产业务迁移后,观察连接新Kafka实例的消费业务是否正常。 迁移结束。 常见问题:如何将持久化数据也一起迁移 如果需要将原Kafka的已消费数据也迁移到Kafka实例,可以使用Smart Connect工具,模拟成原Kafk