MapReduce服务 MRS-Flink向Kafka生产并消费数据应用开发思路:数据规划
数据规划
Flink样例工程的数据存储在Kafka组件中。Flink向Kafka组件发送数据(需要有kafka权限用户),并从Kafka组件获取数据。
- 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
- 创建Topic。
- 在服务端配置用户创建topic的权限。
开启Kerberos认证的安全集群将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。配置完后重启kafka服务。未开启Kerberos认证的普通集群无需此配置。
- 用户使用Linux命令创建topic,如果是安全集群,用户执行命令前需要使用kinit命令进行人机认证,如:kinit flinkuser。
flinkuser需要用户自己创建,并拥有创建Kafka的topic权限。具体操作请参考准备Flink应用开发用户。
创建topic的命令格式:
bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}
表1 参数说明 参数名
说明
{zkQuorum}
ZooKeeper集群信息,格式为IP:port。
{partitionNum}
topic的分区数。
{replicationNum}
topic中每个partition数据的副本数。
{Topic}
Topic名称。
示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,Topic名称为topic1的数据为例。
bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181/kafka --partitions 5 --replication-factor 1 --topic topic1
- 在服务端配置用户创建topic的权限。
- 如果集群开启了kerberos, 执行该步骤进行安全认证,否则跳过该步骤。
- Kerberos认证配置
- 客户端配置。
在Flink配置文件“flink-conf.yaml”中,增加kerberos认证相关配置(主要在“contexts”项中增加“KafkaClient”),示例如下:
security.kerberos.login.keytab: /home/demo/flink/release/flink-1.2.1/keytab/admin.keytabsecurity.kerberos.login.principal: adminsecurity.kerberos.login.contexts: Client,KafkaClientsecurity.kerberos.login.use-ticket-cache: false
- 运行参数。
关于“SASL_PLAINTEXT”协议的运行参数示例如下:
--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka //10.96.101.32:21007表示kafka服务器的IP:port
- 客户端配置。
- Kerberos认证配置
- MapReduce服务_什么是Kafka_如何使用Kafka
- Kafka架构_Kafka如何实现负载均衡_Kafka数据存储方式-华为云
- 分布式消息服务有哪些_分布式消息服务哪个好_分布式消息-华为云
- 分布式消息中间件实战_分布式消息实战_分布式消息-华为云
- 分布式消息系统Kafka_分布式消息系统_分布式消息kafka可以解决什么问题-华为云
- kafka是什么_kafka介绍_分布式消息服务Kafka版
- 分布式消息服务优势_分布式消息服务_消息队列-华为云
- MapReduce服务_如何使用MapReduce服务_MRS集群客户端安装与使用
- MapReduce工作原理_MapReduce是什么意思_MapReduce流程_MRS_华为云
- 数据治理中心_数据开发_数据开发示例_使用教程-华为云