检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
测试Kafka生产速率和CPU消耗 本章节从批处理大小、是否跨AZ生产、副本数、同步/异步复制的维度进行分布式消息服务Kafka版的性能测试,对比客户端消息生产速率和服务端CPU消耗,得出性能测试结果。 测试场景一(批处理大小):相同的Kafka实例和Topic,不同的消息大小
测试Kafka实例TPS 通过以下几个场景,测试不同实例规格的TPS。 测试场景一(实例是否开启SASL):相同的Topic,实例分为开启SASL和未开启SASL 测试场景二(同步/异步复制):相同的实例,不同复制机制的Topic 测试场景三(是否同步落盘):相同的实例,不同落盘机制的Topic
测试实例性能 测试Kafka生产速率和CPU消耗 测试Kafka实例TPS
Kafka 2.7版本,适用于测试场景,不建议用于生产业务。 TPS(Transaction per second),在Kafka场景中,指每秒能写入到Kafka实例的最大消息数量。下表中TPS性能,是指以1KB大小的消息为例的每秒处理消息条数。测试场景为连接内网访问明文接入、磁盘类型为超高I/O的实例。
单机和集群Kafka实例差异概述 单机实例是分布式消费服务Kafka版提供的单代理实例,只适用体验和业务测试场景,无法保证性能和可靠性。如果需要在生产环境使用Kafka实例,建议购买集群实例。 单机实例和集群实例支持的特性和功能有部分差异,具体如表1所示。 表1 单机实例和集群实例的差异说明
长期承诺的用户。本文将介绍按需计费Kafka实例的计费规则。 适用场景 按需计费适用于具有不能中断的短期、突增或不可预测的应用或服务,例如电商抢购、临时测试、科学计算。 适用计费项 分布式消息服务Kafka版对您选择的Kafka实例和Kafka的磁盘存储空间收费。以下计费项支持按需计费。
Kafka体验版实例规格为EXP,为单代理实例,不支持Kafka Manager、Smart Connect和多AZ等功能。 Kafka体验版实例只适用体验和业务测试场景,无法保证性能和可靠性。如果需要在生产环境使用Kafka实例,建议购买按需或包年包月的Kafka实例。 Kafka体验版仅在华南-广州的区域支持,其他区域暂不支持。
a实例的最大消息数量。下表中TPS性能,是指以1KB大小的消息为例的每秒处理消息条数。测试场景为连接内网访问明文接入、磁盘类型为超高I/O的实例。如果您想要了解更多关于TPS的性能,请参考测试Kafka实例TPS。 表1 Kafka集群实例规格 规格名称 代理个数范围 单个代理TPS
区平衡。 自动平衡 登录管理控制台。 在管理控制台左上角单击,选择区域。 请选择Kafka实例所在的区域。 在管理控制台左上角单击,选择“应用中间件 > 分布式消息服务Kafka版”,进入分布式消息服务Kafka专享版页面。 在左侧导航栏单击“Kafka实例”,进入Kafka实例列表页面。
listeners IP 步骤五:验证接口连通性 参考使用客户端连接Kafka(关闭SASL)或者使用客户端连接Kafka(开启SASL),测试是否可以生产和消费消息。 测试接口连通性时,注意以下几点: 连接Kafka实例的地址为“advertised.listeners IP:9011”,以图8
orMaker,否则会导致数据在Topic内无限循环复制。 实施步骤 购买一台弹性云服务器,确保弹性云服务器与源集群、目标集群网络互通。具体购买操作,请参考购买弹性云服务器。 登录弹性云服务器,安装Java JDK,并配置JAVA_HOME与PATH环境变量,使用执行用户在用户家目录下修改“
IP(使用DNAT访问) 步骤五:验证接口连通性 参考使用客户端连接Kafka(关闭SASL)或者使用客户端连接Kafka(开启SASL),测试是否可以生产和消费消息。 测试接口连通性时,注意以下几点: 连接Kafka实例的地址为“advertised.listeners IP:9011”,以图6
\src\test\java\com\dms\consumer 消费消息的测试代码。 DmsProducerTest.java .\src\test\java\com\dms\producer 生产消息的测试代码。 pom.xml .\ maven配置文件,包含Kafka客户端引用。
求什么类型的操作。 GET:请求服务器返回指定资源。 PUT:请求服务器更新指定资源。 POST:请求服务器新增资源或执行特殊操作。 DELETE:请求服务器删除指定资源,如删除对象等。 HEAD:请求服务器资源头部。 PATCH:请求服务器更新资源的部分内容。当资源不存在的时候,PATCH可能会去创建一个新的资源。
老化时间修改为72小时,分区数修改为6,新增分区分布在broker-1和broker-2上,不同步复制,不同步落盘,消息时间类型为LogAppendTime,最大批处理大小10485760。 PUT https://{endpoint}/v2/{project_id}/insta
度为4~64的字符。 user_desc String 用户描述。 role String 用户角色。 default_app Boolean 是否为默认应用。 created_time Long 创建时间。 请求示例 查询用户列表。 GET https://{endpoint}
coordinators objects 所有消费组对应的协调器列表。 表3 coordinators 参数 参数类型 描述 group_id String 消费组ID。 id Integer 对应协调器的broker id。 host String 对应协调器的地址。 port Integer 端口号。
out.println("Exception Happened when polling records: " + e); logger.error("Exception Happened when polling records:
Kafka设计初衷就是为了应对大量日志传输场景,应用通过异步方式将日志消息同步到消息服务,再通过其他组件对日志做实时或离线分析,也可用于关键日志信息收集进行应用监控。 日志同步主要有三个关键部分:日志采集客户端,Kafka消息队列以及后端的日志处理应用。 日志采集客户端,负责用户各类应用服务的日志数据采集,
"topic_other_configs" : [ { "name" : "message.timestamp.type", "value" : "LogAppendTime" }, { "name" : "max.message.bytes", "value" : 10485760