检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
(可选)修改客户端配置文件。 在Kafka控制台的“基本信息 > 连接信息”中查看Kafka安全协议,两种安全协议对应的配置文件设置有所不同,请根据实际情况配置。 SASL_PLAINTEXT:如果已经设置了用户名和密码,请跳过此步骤,执行2。否则在Kafka客户端的“/config”目录中创建“ssl-user-config
若“auto.create.groups.enable”设置为“true”,当消费组的状态为“EMPTY”且从未提交过offset,系统将在十分钟后自动删除该消费组。 若“auto.create.groups.enable”设置为“false”,系统不会自动删除消费组。如果需要删除消费组,需要您手动删除。
SESSION_TIMEOUT_MS_CONFIG配置的时间,默认为1000毫秒)后,服务端才认为此消费组客户端已下线。 调用方法 请参见如何调用API。 URI PUT /v2/kafka/{project_id}/instances/{instance_id}/groups/
查询指定消费组的消费成员 功能介绍 查询指定消费组的消费成员。 调用方法 请参见如何调用API。 URI GET /v2/{engine}/{project_id}/instances/{instance_id}/groups/{group}/members 表1 路径参数 参数
是否支持跨Region访问? Kafka实例是否支持跨VPC访问? Kafka实例是否支持不同的子网? Kafka是否支持Kerberos认证,如何开启认证? Kafka实例是否支持无密码访问? 开启公网访问后,在哪查看公网IP地址? Kafka支持服务端认证客户端吗? 连接开启SASL_SSL的Kafka实例时,ssl
时,“作用范围”需要选择“区域级项目”,然后在指定区域(如华北-北京1)对应的项目(cn-north-1)中设置相关权限,并且该权限仅对此项目生效;如果在“所有项目”中设置权限,则该权限在所有区域项目中都生效。访问DMS for Kafka时,需要先切换至授权区域。 权限根据授权精细程度分为角色和策略。
partition Integer topic分区数,设置消费的并发数。 retention_time Integer 消息老化时间。 sync_replication Boolean 是否开启同步复制,开启后,客户端生产消息时相应的也要设置acks=-1,否则不生效,默认关闭。 sync_message_flush
查询所有消费组 功能介绍 查询所有消费组。 调用方法 请参见如何调用API。 URI GET /v2/{engine}/{project_id}/instances/{instance_id}/groups 表1 路径参数 参数 是否必选 参数类型 描述 engine 是 String
低。 partition 否 Integer topic分区数,设置消费的并发数。取值范围:1-200。 sync_replication 否 Boolean 是否开启同步复制,开启后,客户端生产消息时相应的也要设置acks=-1,否则不生效,默认关闭。 retention_time
并发度越大。 该参数设置为1时,消费消息时会按照先入先出的顺序进行消费。 取值范围:1~200 副本数 您可以为每个Topic设置副本的数量,Kafka会自动在每个副本上备份数据,当其中一个Broker故障时数据依然是可用的,副本数越大可靠性越高。 该参数设置为1时,表示只有一份数据。
查询指定实例 功能介绍 查询指定实例的详细信息。 当前页面API为历史版本API,未来可能停止维护。请使用查询指定实例。 调用方法 请参见如何调用API。 URI GET /v2/{engine}/{project_id}/instances/{instance_id} 表1 路径参数
副本的Topic需要在生产客户端配置重试机制,方法如下: 生产客户端为Kafka开源客户端时,检查是否配置retries参数,建议此参数值设置为3~5。 生产客户端为Flink客户端时,检查是否配置重启策略,配置重启策略可以参考如下代码。 StreamExecutionEnvironment
https://{endpoint}/v1/{project_id}/vpc-endpoint-services 请参考以下说明设置请求参数,其他参数根据实际情况设置。 port_id:输入6中获取的其中一个Port ID。 vpc_id:输入8中获取的VPC ID。 server_type:输入“VM”。
消息。 batch.size:每次批量发送消息的大小(单位为字节)。 linger.ms:两次发送时间间隔。 topic:创建Topic中设置的Topic名称。 num-records:总共需要发送的消息数。 record-size:每条消息的大小。 throughput:每秒发送的消息数。
被踢出消费组,此时可能会导致消费异常,需要重点关注。 消费者被服务端认为异常从而被踢出消费组的场景如下: 未能及时发送心跳请求。 消费者以设置的heartbeat.interval.ms为间隔向broker发送心跳请求,如果broker在session.timeout.ms时间内
在分布式消息服务Kafka版提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 如图1所示,Top
创建实例 功能介绍 创建实例。 该接口支持创建按需和包周期两种计费方式的实例。 调用方法 请参见如何调用API。 URI POST /v2/{engine}/{project_id}/instances 表1 路径参数 参数 是否必选 参数类型 描述 engine 是 String
查询指定实例 功能介绍 查询指定实例的详细信息。 调用方法 请参见如何调用API。 URI GET /v2/{project_id}/instances/{instance_id} 表1 路径参数 参数 是否必选 参数类型 描述 project_id 是 String 项目ID,获取方式请参见获取项目ID。
配置,如果Kafka实例没有开启密文接入,使用的是不加密连接,请注释相关代码;如果Kafka实例开启了密文接入,则必须使用加密方式连接,请设置相关参数。 生产消息配置文件(对应生产消息代码中的dms.sdk.producer.properties文件) 以下粗体部分为不同Kafk
本章节介绍在对消费消息实时性要求不高的场景中,如何优化消费者Polling,减少消息较少或者没有消息时的资源浪费。 Logstash对接Kafka生产消费消息 Kafka实例可以作为Logstash输入源,也可以作为Logstash输出源。本章节介绍Logstash如何对接Kafka实例生产消费消息。