云服务器内容精选
-
Kafka 样例工程配置文件说明 Conf目录个各配置文件及重要参数配置说明。 Producer API配置项。 表1 producer.properties文件配置项 参数 描述 备注 security.protocol 安全协议类型 生产者使用的安全协议类型,当前Kerberos开启的模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。Kerberos未开启的模式下配置为PLAINTEXT。 kerberos.domain.name 域名 MRS 服务集群的Kerberos域名,未开启Kerberos认证的集群无需配置。 sasl.kerberos.service.name 服务名 Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。未开启Kerberos认证的集群无需配置。 Consumer API配置项。 表2 consumer.properties文件配置项 参数 描述 备注 security.protocol 安全协议类型 消费者使用的安全协议类型,当前安全模式下Kerberos开启的模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。Kerberos未开启的模式下配置为PLAINTEXT。 kerberos.domain.name 域名 MRS服务集群的Kerberos域名,未开启Kerberos认证的集群无需配置。 group.id 消费者的group id - auto.commit.interval.ms 是否自动提交offset 布尔值参数,默认值为true sasl.kerberos.service.name 服务名 Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。未开启Kerberos认证的集群无需配置。 客户端信息配置项。 表3 client.properties文件配置项 参数 描述 备注 metadata.broker.list 元数据Broker地址列表 通过此参数值,创建与元数据Broker之间的连接,需要直接访问元数据的API需要用到此参数。访问端口仅支持不开启Kerberos模式下的端口,端口说明详见Kafka安全接口介绍 kafka.client.zookeeper.principal kafka集群访问zookeeper的认证和域名 - bootstrap.servers Broker地址列表 通过此参数值,创建与Broker之间的连接。端口配置项详见Kafka安全接口介绍 zookeeper.connect zookeeper地址列表 通过此参数,访问zookeeper,末尾需要带上kafka服务名kafka MRS服务是否开启Kerberos认证配置项。 表4 kafkaSecurityMode文件配置项 参数 描述 备注 kafka.client.security.mode kafka所在的MRS服务集群是否开启Kerberos认证配置项 若开启了Kerberos认证,设置为yes,否则设置为no。 log4j日志配置项文件log4j.properties log4j日志框架的配置文件,默认情况不输入样例工程运行日志。 父主题: 开发Kafka应用
-
功能介绍 每一个Consumer实例都属于一个Consumer group,每一条消息只会被同一个Consumer group里的一个Consumer实例消费(不同的Consumer group可以同时消费同一条消息)。 下面代码片段在com.huawei.bigdata.kafka.example.Old_Consumer类中,作用在于订阅指定Topic的消息。(注意:旧Consumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍)
-
代码样例 通过SpringBoot实现Kafka生产消费的样例代码如下: @RestController public class MessageController { private final static Logger LOG = LoggerFactory.getLogger(MessageController.class); @Autowired private KafkaProperties kafkaProperties; @GetMapping("/produce") public String produce() { Producer producerThread = new Producer(); producerThread.init(this.kafkaProperties); producerThread.start(); String message = "Start to produce messages"; LOG.info(message); return message; } @GetMapping("/consume") public String consume() { Consumer consumerThread = new Consumer(); consumerThread.init(this.kafkaProperties); consumerThread.start(); LOG.info("Start to consume messages"); // 等到180s后将consumer关闭,实际执行过程中可修改 try { Thread.sleep(consumerThread.getThreadAliveTime()); } catch (InterruptedException e) { LOG.info("Occurred InterruptedException: ", e); } finally { consumerThread.close(); } return String.format("Finished consume messages"); } }
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格